Make plpgsql use its DTYPE_REC code paths for composite-type variables.

Formerly, DTYPE_REC was used only for variables declared as "record";
variables of named composite types used DTYPE_ROW, which is faster for
some purposes but much less flexible.  In particular, the ROW code paths
are entirely incapable of dealing with DDL-caused changes to the number
or data types of the columns of a row variable, once a particular plpgsql
function has been parsed for the first time in a session.  And, since the
stored representation of a ROW isn't a tuple, there wasn't any easy way
to deal with variables of domain-over-composite types, since the domain
constraint checking code would expect the value to be checked to be a
tuple.  A lesser, but still real, annoyance is that ROW format cannot
represent a true NULL composite value, only a row of per-field NULL
values, which is not exactly the same thing.

Hence, switch to using DTYPE_REC for all composite-typed variables,
whether "record", named composite type, or domain over named composite
type.  DTYPE_ROW remains but is used only for its native purpose, to
represent a fixed-at-compile-time list of variables, for instance the
targets of an INTO clause.

To accomplish this without taking significant performance losses, introduce
infrastructure that allows storing composite-type variables as "expanded
objects", similar to the "expanded array" infrastructure introduced in
commit 1dc5ebc90.  A composite variable's value is thereby kept (most of
the time) in the form of separate Datums, so that field accesses and
updates are not much more expensive than they were in the ROW format.
This holds the line, more or less, on performance of variables of named
composite types in field-access-intensive microbenchmarks, and makes
variables declared "record" perform much better than before in similar
tests.  In addition, the logic involved with enforcing composite-domain
constraints against updates of individual fields is in the expanded
record infrastructure not plpgsql proper, so that it might be reusable
for other purposes.

In further support of this, introduce a typcache feature for assigning a
unique-within-process identifier to each distinct tuple descriptor of
interest; in particular, DDL alterations on composite types result in a new
identifier for that type.  This allows very cheap detection of the need to
refresh tupdesc-dependent data.  This improves on the "tupDescSeqNo" idea
I had in commit 687f096ea: that assigned identifying sequence numbers to
successive versions of individual composite types, but the numbers were not
unique across different types, nor was there support for assigning numbers
to registered record types.

In passing, allow plpgsql functions to accept as well as return type
"record".  There was no good reason for the old restriction, and it
was out of step with most of the other PLs.

Tom Lane, reviewed by Pavel Stehule

Discussion: https://postgr.es/m/8962.1514399547@sss.pgh.pa.us
This commit is contained in:
Tom Lane 2018-02-13 18:52:21 -05:00
parent 2ac3e6acc2
commit 4b93f57999
20 changed files with 4596 additions and 913 deletions

View File

@ -123,7 +123,9 @@
and they can return a result of any of these types. They can also
accept or return any composite type (row type) specified by name.
It is also possible to declare a <application>PL/pgSQL</application>
function as returning <type>record</type>, which means that the result
function as accepting <type>record</type>, which means that any
composite type will do as input, or
as returning <type>record</type>, which means that the result
is a row type whose columns are determined by specification in the
calling query, as discussed in <xref linkend="queries-tablefunctions"/>.
</para>
@ -671,14 +673,6 @@ user_id users.user_id%TYPE;
be selected from it, for example <literal>$1.user_id</literal>.
</para>
<para>
Only the user-defined columns of a table row are accessible in a
row-type variable, not the OID or other system columns (because the
row could be from a view). The fields of the row type inherit the
table's field size or precision for data types such as
<type>char(<replaceable>n</replaceable>)</type>.
</para>
<para>
Here is an example of using composite types. <structname>table1</structname>
and <structname>table2</structname> are existing tables having at least the

View File

@ -70,6 +70,7 @@
#include "utils/builtins.h"
#include "utils/date.h"
#include "utils/datum.h"
#include "utils/expandedrecord.h"
#include "utils/lsyscache.h"
#include "utils/timestamp.h"
#include "utils/typcache.h"
@ -2820,57 +2821,105 @@ ExecEvalFieldSelect(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
if (*op->resnull)
return;
/* Get the composite datum and extract its type fields */
tupDatum = *op->resvalue;
tuple = DatumGetHeapTupleHeader(tupDatum);
tupType = HeapTupleHeaderGetTypeId(tuple);
tupTypmod = HeapTupleHeaderGetTypMod(tuple);
/* Lookup tupdesc if first time through or if type changes */
tupDesc = get_cached_rowtype(tupType, tupTypmod,
&op->d.fieldselect.argdesc,
econtext);
/*
* Find field's attr record. Note we don't support system columns here: a
* datum tuple doesn't have valid values for most of the interesting
* system columns anyway.
*/
if (fieldnum <= 0) /* should never happen */
elog(ERROR, "unsupported reference to system column %d in FieldSelect",
fieldnum);
if (fieldnum > tupDesc->natts) /* should never happen */
elog(ERROR, "attribute number %d exceeds number of columns %d",
fieldnum, tupDesc->natts);
attr = TupleDescAttr(tupDesc, fieldnum - 1);
/* Check for dropped column, and force a NULL result if so */
if (attr->attisdropped)
/* We can special-case expanded records for speed */
if (VARATT_IS_EXTERNAL_EXPANDED(DatumGetPointer(tupDatum)))
{
*op->resnull = true;
return;
ExpandedRecordHeader *erh = (ExpandedRecordHeader *) DatumGetEOHP(tupDatum);
Assert(erh->er_magic == ER_MAGIC);
/* Extract record's TupleDesc */
tupDesc = expanded_record_get_tupdesc(erh);
/*
* Find field's attr record. Note we don't support system columns
* here: a datum tuple doesn't have valid values for most of the
* interesting system columns anyway.
*/
if (fieldnum <= 0) /* should never happen */
elog(ERROR, "unsupported reference to system column %d in FieldSelect",
fieldnum);
if (fieldnum > tupDesc->natts) /* should never happen */
elog(ERROR, "attribute number %d exceeds number of columns %d",
fieldnum, tupDesc->natts);
attr = TupleDescAttr(tupDesc, fieldnum - 1);
/* Check for dropped column, and force a NULL result if so */
if (attr->attisdropped)
{
*op->resnull = true;
return;
}
/* Check for type mismatch --- possible after ALTER COLUMN TYPE? */
/* As in CheckVarSlotCompatibility, we should but can't check typmod */
if (op->d.fieldselect.resulttype != attr->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("attribute %d has wrong type", fieldnum),
errdetail("Table has type %s, but query expects %s.",
format_type_be(attr->atttypid),
format_type_be(op->d.fieldselect.resulttype))));
/* extract the field */
*op->resvalue = expanded_record_get_field(erh, fieldnum,
op->resnull);
}
else
{
/* Get the composite datum and extract its type fields */
tuple = DatumGetHeapTupleHeader(tupDatum);
/* Check for type mismatch --- possible after ALTER COLUMN TYPE? */
/* As in CheckVarSlotCompatibility, we should but can't check typmod */
if (op->d.fieldselect.resulttype != attr->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("attribute %d has wrong type", fieldnum),
errdetail("Table has type %s, but query expects %s.",
format_type_be(attr->atttypid),
format_type_be(op->d.fieldselect.resulttype))));
tupType = HeapTupleHeaderGetTypeId(tuple);
tupTypmod = HeapTupleHeaderGetTypMod(tuple);
/* heap_getattr needs a HeapTuple not a bare HeapTupleHeader */
tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
tmptup.t_data = tuple;
/* Lookup tupdesc if first time through or if type changes */
tupDesc = get_cached_rowtype(tupType, tupTypmod,
&op->d.fieldselect.argdesc,
econtext);
/* extract the field */
*op->resvalue = heap_getattr(&tmptup,
fieldnum,
tupDesc,
op->resnull);
/*
* Find field's attr record. Note we don't support system columns
* here: a datum tuple doesn't have valid values for most of the
* interesting system columns anyway.
*/
if (fieldnum <= 0) /* should never happen */
elog(ERROR, "unsupported reference to system column %d in FieldSelect",
fieldnum);
if (fieldnum > tupDesc->natts) /* should never happen */
elog(ERROR, "attribute number %d exceeds number of columns %d",
fieldnum, tupDesc->natts);
attr = TupleDescAttr(tupDesc, fieldnum - 1);
/* Check for dropped column, and force a NULL result if so */
if (attr->attisdropped)
{
*op->resnull = true;
return;
}
/* Check for type mismatch --- possible after ALTER COLUMN TYPE? */
/* As in CheckVarSlotCompatibility, we should but can't check typmod */
if (op->d.fieldselect.resulttype != attr->atttypid)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("attribute %d has wrong type", fieldnum),
errdetail("Table has type %s, but query expects %s.",
format_type_be(attr->atttypid),
format_type_be(op->d.fieldselect.resulttype))));
/* heap_getattr needs a HeapTuple not a bare HeapTupleHeader */
tmptup.t_len = HeapTupleHeaderGetDatumLength(tuple);
tmptup.t_data = tuple;
/* extract the field */
*op->resvalue = heap_getattr(&tmptup,
fieldnum,
tupDesc,
op->resnull);
}
}
/*

View File

@ -12,7 +12,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = acl.o amutils.o arrayfuncs.o array_expanded.o array_selfuncs.o \
array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \
bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \
encode.o enum.o expandeddatum.o \
encode.o enum.o expandeddatum.o expandedrecord.o \
float.o format_type.o formatting.o genfile.o \
geo_ops.o geo_selfuncs.o geo_spgist.o inet_cidr_ntop.o inet_net_pton.o \
int.o int8.o json.o jsonb.o jsonb_gin.o jsonb_op.o jsonb_util.o \

File diff suppressed because it is too large Load Diff

View File

@ -259,12 +259,22 @@ static const dshash_parameters srtr_typmod_table_params = {
LWTRANCHE_SESSION_TYPMOD_TABLE
};
/* hashtable for recognizing registered record types */
static HTAB *RecordCacheHash = NULL;
/* arrays of info about registered record types, indexed by assigned typmod */
static TupleDesc *RecordCacheArray = NULL;
static int32 RecordCacheArrayLen = 0; /* allocated length of array */
static uint64 *RecordIdentifierArray = NULL;
static int32 RecordCacheArrayLen = 0; /* allocated length of above arrays */
static int32 NextRecordTypmod = 0; /* number of entries used */
/*
* Process-wide counter for generating unique tupledesc identifiers.
* Zero and one (INVALID_TUPLEDESC_IDENTIFIER) aren't allowed to be chosen
* as identifiers, so we start the counter at INVALID_TUPLEDESC_IDENTIFIER.
*/
static uint64 tupledesc_id_counter = INVALID_TUPLEDESC_IDENTIFIER;
static void load_typcache_tupdesc(TypeCacheEntry *typentry);
static void load_rangetype_info(TypeCacheEntry *typentry);
static void load_domaintype_info(TypeCacheEntry *typentry);
@ -793,10 +803,10 @@ load_typcache_tupdesc(TypeCacheEntry *typentry)
typentry->tupDesc->tdrefcount++;
/*
* In future, we could take some pains to not increment the seqno if the
* tupdesc didn't really change; but for now it's not worth it.
* In future, we could take some pains to not change tupDesc_identifier if
* the tupdesc didn't really change; but for now it's not worth it.
*/
typentry->tupDescSeqNo++;
typentry->tupDesc_identifier = ++tupledesc_id_counter;
relation_close(rel, AccessShareLock);
}
@ -1496,7 +1506,8 @@ cache_range_element_properties(TypeCacheEntry *typentry)
}
/*
* Make sure that RecordCacheArray is large enough to store 'typmod'.
* Make sure that RecordCacheArray and RecordIdentifierArray are large enough
* to store 'typmod'.
*/
static void
ensure_record_cache_typmod_slot_exists(int32 typmod)
@ -1505,6 +1516,8 @@ ensure_record_cache_typmod_slot_exists(int32 typmod)
{
RecordCacheArray = (TupleDesc *)
MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc));
RecordIdentifierArray = (uint64 *)
MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(uint64));
RecordCacheArrayLen = 64;
}
@ -1519,6 +1532,10 @@ ensure_record_cache_typmod_slot_exists(int32 typmod)
newlen * sizeof(TupleDesc));
memset(RecordCacheArray + RecordCacheArrayLen, 0,
(newlen - RecordCacheArrayLen) * sizeof(TupleDesc));
RecordIdentifierArray = (uint64 *) repalloc(RecordIdentifierArray,
newlen * sizeof(uint64));
memset(RecordIdentifierArray + RecordCacheArrayLen, 0,
(newlen - RecordCacheArrayLen) * sizeof(uint64));
RecordCacheArrayLen = newlen;
}
}
@ -1581,11 +1598,17 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError)
/*
* Our local array can now point directly to the TupleDesc
* in shared memory.
* in shared memory, which is non-reference-counted.
*/
RecordCacheArray[typmod] = tupdesc;
Assert(tupdesc->tdrefcount == -1);
/*
* We don't share tupdesc identifiers across processes, so
* assign one locally.
*/
RecordIdentifierArray[typmod] = ++tupledesc_id_counter;
dshash_release_lock(CurrentSession->shared_typmod_table,
entry);
@ -1790,12 +1813,61 @@ assign_record_type_typmod(TupleDesc tupDesc)
RecordCacheArray[entDesc->tdtypmod] = entDesc;
recentry->tupdesc = entDesc;
/* Assign a unique tupdesc identifier, too. */
RecordIdentifierArray[entDesc->tdtypmod] = ++tupledesc_id_counter;
/* Update the caller's tuple descriptor. */
tupDesc->tdtypmod = entDesc->tdtypmod;
MemoryContextSwitchTo(oldcxt);
}
/*
* assign_record_type_identifier
*
* Get an identifier, which will be unique over the lifespan of this backend
* process, for the current tuple descriptor of the specified composite type.
* For named composite types, the value is guaranteed to change if the type's
* definition does. For registered RECORD types, the value will not change
* once assigned, since the registered type won't either. If an anonymous
* RECORD type is specified, we return a new identifier on each call.
*/
uint64
assign_record_type_identifier(Oid type_id, int32 typmod)
{
if (type_id != RECORDOID)
{
/*
* It's a named composite type, so use the regular typcache.
*/
TypeCacheEntry *typentry;
typentry = lookup_type_cache(type_id, TYPECACHE_TUPDESC);
if (typentry->tupDesc == NULL)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("type %s is not composite",
format_type_be(type_id))));
Assert(typentry->tupDesc_identifier != 0);
return typentry->tupDesc_identifier;
}
else
{
/*
* It's a transient record type, so look in our record-type table.
*/
if (typmod >= 0 && typmod < RecordCacheArrayLen &&
RecordCacheArray[typmod] != NULL)
{
Assert(RecordIdentifierArray[typmod] != 0);
return RecordIdentifierArray[typmod];
}
/* For anonymous or unrecognized record type, generate a new ID */
return ++tupledesc_id_counter;
}
}
/*
* Return the amout of shmem required to hold a SharedRecordTypmodRegistry.
* This exists only to avoid exposing private innards of

View File

@ -0,0 +1,227 @@
/*-------------------------------------------------------------------------
*
* expandedrecord.h
* Declarations for composite expanded objects.
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/utils/expandedrecord.h
*
*-------------------------------------------------------------------------
*/
#ifndef EXPANDEDRECORD_H
#define EXPANDEDRECORD_H
#include "access/htup.h"
#include "access/tupdesc.h"
#include "fmgr.h"
#include "utils/expandeddatum.h"
/*
* An expanded record is contained within a private memory context (as
* all expanded objects must be) and has a control structure as below.
*
* The expanded record might contain a regular "flat" tuple if that was the
* original input and we've not modified it. Otherwise, the contents are
* represented by Datum/isnull arrays plus type information. We could also
* have both forms, if we've deconstructed the original tuple for access
* purposes but not yet changed it. For pass-by-reference field types, the
* Datums would point into the flat tuple in this situation. Once we start
* modifying tuple fields, new pass-by-ref fields are separately palloc'd
* within the memory context.
*
* It's possible to build an expanded record that references a "flat" tuple
* stored externally, if the caller can guarantee that that tuple will not
* change for the lifetime of the expanded record. (This frammish is mainly
* meant to avoid unnecessary data copying in trigger functions.)
*/
#define ER_MAGIC 1384727874 /* ID for debugging crosschecks */
typedef struct ExpandedRecordHeader
{
/* Standard header for expanded objects */
ExpandedObjectHeader hdr;
/* Magic value identifying an expanded record (for debugging only) */
int er_magic;
/* Assorted flag bits */
int flags;
#define ER_FLAG_FVALUE_VALID 0x0001 /* fvalue is up to date? */
#define ER_FLAG_FVALUE_ALLOCED 0x0002 /* fvalue is local storage? */
#define ER_FLAG_DVALUES_VALID 0x0004 /* dvalues/dnulls are up to date? */
#define ER_FLAG_DVALUES_ALLOCED 0x0008 /* any field values local storage? */
#define ER_FLAG_HAVE_EXTERNAL 0x0010 /* any field values are external? */
#define ER_FLAG_TUPDESC_ALLOCED 0x0020 /* tupdesc is local storage? */
#define ER_FLAG_IS_DOMAIN 0x0040 /* er_decltypeid is domain? */
#define ER_FLAG_IS_DUMMY 0x0080 /* this header is dummy (see below) */
/* flag bits that are not to be cleared when replacing tuple data: */
#define ER_FLAGS_NON_DATA \
(ER_FLAG_TUPDESC_ALLOCED | ER_FLAG_IS_DOMAIN | ER_FLAG_IS_DUMMY)
/* Declared type of the record variable (could be a domain type) */
Oid er_decltypeid;
/*
* Actual composite type/typmod; never a domain (if ER_FLAG_IS_DOMAIN,
* these identify the composite base type). These will match
* er_tupdesc->tdtypeid/tdtypmod, as well as the header fields of
* composite datums made from or stored in this expanded record.
*/
Oid er_typeid; /* type OID of the composite type */
int32 er_typmod; /* typmod of the composite type */
/*
* Tuple descriptor, if we have one, else NULL. This may point to a
* reference-counted tupdesc originally belonging to the typcache, in
* which case we use a memory context reset callback to release the
* refcount. It can also be locally allocated in this object's private
* context (in which case ER_FLAG_TUPDESC_ALLOCED is set).
*/
TupleDesc er_tupdesc;
/*
* Unique-within-process identifier for the tupdesc (see typcache.h). This
* field will never be equal to INVALID_TUPLEDESC_IDENTIFIER.
*/
uint64 er_tupdesc_id;
/*
* If we have a Datum-array representation of the record, it's kept here;
* else ER_FLAG_DVALUES_VALID is not set, and dvalues/dnulls may be NULL
* if they've not yet been allocated. If allocated, the dvalues and
* dnulls arrays are palloc'd within the object private context, and are
* of length matching er_tupdesc->natts. For pass-by-ref field types,
* dvalues entries might point either into the fstartptr..fendptr area, or
* to separately palloc'd chunks.
*/
Datum *dvalues; /* array of Datums */
bool *dnulls; /* array of is-null flags for Datums */
int nfields; /* length of above arrays */
/*
* flat_size is the current space requirement for the flat equivalent of
* the expanded record, if known; otherwise it's 0. We store this to make
* consecutive calls of get_flat_size cheap. If flat_size is not 0, the
* component values data_len, hoff, and hasnull must be valid too.
*/
Size flat_size;
Size data_len; /* data len within flat_size */
int hoff; /* header offset */
bool hasnull; /* null bitmap needed? */
/*
* fvalue points to the flat representation if we have one, else it is
* NULL. If the flat representation is valid (up to date) then
* ER_FLAG_FVALUE_VALID is set. Even if we've outdated the flat
* representation due to changes of user fields, it can still be used to
* fetch system column values. If we have a flat representation then
* fstartptr/fendptr point to the start and end+1 of its data area; this
* is so that we can tell which Datum pointers point into the flat
* representation rather than being pointers to separately palloc'd data.
*/
HeapTuple fvalue; /* might or might not be private storage */
char *fstartptr; /* start of its data area */
char *fendptr; /* end+1 of its data area */
/* Working state for domain checking, used if ER_FLAG_IS_DOMAIN is set */
MemoryContext er_domain_check_cxt; /* short-term memory context */
struct ExpandedRecordHeader *er_dummy_header; /* dummy record header */
void *er_domaininfo; /* cache space for domain_check() */
/* Callback info (it's active if er_mcb.arg is not NULL) */
MemoryContextCallback er_mcb;
} ExpandedRecordHeader;
/* fmgr macros for expanded record objects */
#define PG_GETARG_EXPANDED_RECORD(n) DatumGetExpandedRecord(PG_GETARG_DATUM(n))
#define ExpandedRecordGetDatum(erh) EOHPGetRWDatum(&(erh)->hdr)
#define ExpandedRecordGetRODatum(erh) EOHPGetRODatum(&(erh)->hdr)
#define PG_RETURN_EXPANDED_RECORD(x) PG_RETURN_DATUM(ExpandedRecordGetDatum(x))
/* assorted other macros */
#define ExpandedRecordIsEmpty(erh) \
(((erh)->flags & (ER_FLAG_DVALUES_VALID | ER_FLAG_FVALUE_VALID)) == 0)
#define ExpandedRecordIsDomain(erh) \
(((erh)->flags & ER_FLAG_IS_DOMAIN) != 0)
/* this can substitute for TransferExpandedObject() when we already have erh */
#define TransferExpandedRecord(erh, cxt) \
MemoryContextSetParent((erh)->hdr.eoh_context, cxt)
/* information returned by expanded_record_lookup_field() */
typedef struct ExpandedRecordFieldInfo
{
int fnumber; /* field's attr number in record */
Oid ftypeid; /* field's type/typmod info */
int32 ftypmod;
Oid fcollation; /* field's collation if any */
} ExpandedRecordFieldInfo;
/*
* prototypes for functions defined in expandedrecord.c
*/
extern ExpandedRecordHeader *make_expanded_record_from_typeid(Oid type_id, int32 typmod,
MemoryContext parentcontext);
extern ExpandedRecordHeader *make_expanded_record_from_tupdesc(TupleDesc tupdesc,
MemoryContext parentcontext);
extern ExpandedRecordHeader *make_expanded_record_from_exprecord(ExpandedRecordHeader *olderh,
MemoryContext parentcontext);
extern void expanded_record_set_tuple(ExpandedRecordHeader *erh,
HeapTuple tuple, bool copy);
extern Datum make_expanded_record_from_datum(Datum recorddatum,
MemoryContext parentcontext);
extern TupleDesc expanded_record_fetch_tupdesc(ExpandedRecordHeader *erh);
extern HeapTuple expanded_record_get_tuple(ExpandedRecordHeader *erh);
extern ExpandedRecordHeader *DatumGetExpandedRecord(Datum d);
extern void deconstruct_expanded_record(ExpandedRecordHeader *erh);
extern bool expanded_record_lookup_field(ExpandedRecordHeader *erh,
const char *fieldname,
ExpandedRecordFieldInfo *finfo);
extern Datum expanded_record_fetch_field(ExpandedRecordHeader *erh, int fnumber,
bool *isnull);
extern void expanded_record_set_field_internal(ExpandedRecordHeader *erh,
int fnumber,
Datum newValue, bool isnull,
bool check_constraints);
extern void expanded_record_set_fields(ExpandedRecordHeader *erh,
const Datum *newValues, const bool *isnulls);
/* outside code should never call expanded_record_set_field_internal as such */
#define expanded_record_set_field(erh, fnumber, newValue, isnull) \
expanded_record_set_field_internal(erh, fnumber, newValue, isnull, true)
/*
* Inline-able fast cases. The expanded_record_fetch_xxx functions above
* handle the general cases.
*/
/* Get the tupdesc for the expanded record's actual type */
static inline TupleDesc
expanded_record_get_tupdesc(ExpandedRecordHeader *erh)
{
if (likely(erh->er_tupdesc != NULL))
return erh->er_tupdesc;
else
return expanded_record_fetch_tupdesc(erh);
}
/* Get value of record field */
static inline Datum
expanded_record_get_field(ExpandedRecordHeader *erh, int fnumber,
bool *isnull)
{
if ((erh->flags & ER_FLAG_DVALUES_VALID) &&
likely(fnumber > 0 && fnumber <= erh->nfields))
{
*isnull = erh->dnulls[fnumber - 1];
return erh->dvalues[fnumber - 1];
}
else
return expanded_record_fetch_field(erh, fnumber, isnull);
}
#endif /* EXPANDEDRECORD_H */

View File

@ -76,11 +76,14 @@ typedef struct TypeCacheEntry
/*
* Tuple descriptor if it's a composite type (row type). NULL if not
* composite or information hasn't yet been requested. (NOTE: this is a
* reference-counted tupledesc.) To simplify caching dependent info,
* tupDescSeqNo is incremented each time tupDesc is rebuilt in a session.
* reference-counted tupledesc.)
*
* To simplify caching dependent info, tupDesc_identifier is an identifier
* for this tupledesc that is unique for the life of the process, and
* changes anytime the tupledesc does. Zero if not yet determined.
*/
TupleDesc tupDesc;
int64 tupDescSeqNo;
uint64 tupDesc_identifier;
/*
* Fields computed when TYPECACHE_RANGE_INFO is requested. Zeroes if not
@ -138,6 +141,9 @@ typedef struct TypeCacheEntry
#define TYPECACHE_HASH_EXTENDED_PROC 0x4000
#define TYPECACHE_HASH_EXTENDED_PROC_FINFO 0x8000
/* This value will not equal any valid tupledesc identifier, nor 0 */
#define INVALID_TUPLEDESC_IDENTIFIER ((uint64) 1)
/*
* Callers wishing to maintain a long-lived reference to a domain's constraint
* set must store it in one of these. Use InitDomainConstraintRef() and
@ -179,6 +185,8 @@ extern TupleDesc lookup_rowtype_tupdesc_domain(Oid type_id, int32 typmod,
extern void assign_record_type_typmod(TupleDesc tupDesc);
extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod);
extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
extern size_t SharedRecordTypmodRegistryEstimate(void);

View File

@ -26,7 +26,7 @@ DATA = plpgsql.control plpgsql--1.0.sql plpgsql--unpackaged--1.0.sql
REGRESS_OPTS = --dbname=$(PL_TESTDB)
REGRESS = plpgsql_call plpgsql_control plpgsql_transaction
REGRESS = plpgsql_call plpgsql_control plpgsql_record plpgsql_transaction
all: all-lib

View File

@ -0,0 +1,662 @@
--
-- Tests for PL/pgSQL handling of composite (record) variables
--
create type two_int4s as (f1 int4, f2 int4);
create type two_int8s as (q1 int8, q2 int8);
-- base-case return of a composite type
create function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1,1)::two_int8s; end $$;
select retc(42);
retc
--------
(42,1)
(1 row)
-- ok to return a matching record type
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1::int8, 1::int8); end $$;
select retc(42);
retc
--------
(42,1)
(1 row)
-- we don't currently support implicit casting
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1,1); end $$;
select retc(42);
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 1.
CONTEXT: PL/pgSQL function retc(integer) while casting return value to function's return type
-- nor extra columns
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1::int8, 1::int8, 42); end $$;
select retc(42);
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (3) does not match expected column count (2).
CONTEXT: PL/pgSQL function retc(integer) while casting return value to function's return type
-- same cases with an intermediate "record" variable
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1::int8, 1::int8); return r; end $$;
select retc(42);
retc
--------
(42,1)
(1 row)
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1,1); return r; end $$;
select retc(42);
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 1.
CONTEXT: PL/pgSQL function retc(integer) while casting return value to function's return type
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1::int8, 1::int8, 42); return r; end $$;
select retc(42);
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (3) does not match expected column count (2).
CONTEXT: PL/pgSQL function retc(integer) while casting return value to function's return type
-- but, for mostly historical reasons, we do convert when assigning
-- to a named-composite-type variable
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r two_int8s; begin r := row($1::int8, 1::int8, 42); return r; end $$;
select retc(42);
retc
--------
(42,1)
(1 row)
do $$ declare c two_int8s;
begin c := row(1,2); raise notice 'c = %', c; end$$;
NOTICE: c = (1,2)
do $$ declare c two_int8s;
begin for c in select 1,2 loop raise notice 'c = %', c; end loop; end$$;
NOTICE: c = (1,2)
do $$ declare c4 two_int4s; c8 two_int8s;
begin
c8 := row(1,2);
c4 := c8;
c8 := c4;
raise notice 'c4 = %', c4;
raise notice 'c8 = %', c8;
end$$;
NOTICE: c4 = (1,2)
NOTICE: c8 = (1,2)
-- check passing composite result to another function
create function getq1(two_int8s) returns int8 language plpgsql as $$
declare r two_int8s; begin r := $1; return r.q1; end $$;
select getq1(retc(344));
getq1
-------
344
(1 row)
select getq1(row(1,2));
getq1
-------
1
(1 row)
do $$
declare r1 two_int8s; r2 record; x int8;
begin
r1 := retc(345);
perform getq1(r1);
x := getq1(r1);
raise notice 'x = %', x;
r2 := retc(346);
perform getq1(r2);
x := getq1(r2);
raise notice 'x = %', x;
end$$;
NOTICE: x = 345
NOTICE: x = 346
-- check assignments of composites
do $$
declare r1 two_int8s; r2 two_int8s; r3 record; r4 record;
begin
r1 := row(1,2);
raise notice 'r1 = %', r1;
r1 := r1; -- shouldn't do anything
raise notice 'r1 = %', r1;
r2 := r1;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r2.q2 = r1.q1 + 3; -- check that r2 has distinct storage
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r1 := null;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r1 := row(7,11)::two_int8s;
r2 := r1;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r3 := row(1,2);
r4 := r3;
raise notice 'r3 = %', r3;
raise notice 'r4 = %', r4;
r4.f1 := r4.f1 + 3; -- check that r4 has distinct storage
raise notice 'r3 = %', r3;
raise notice 'r4 = %', r4;
r1 := r3;
raise notice 'r1 = %', r1;
r4 := r1;
raise notice 'r4 = %', r4;
r4.q2 := r4.q2 + 1; -- r4's field names have changed
raise notice 'r4 = %', r4;
end$$;
NOTICE: r1 = (1,2)
NOTICE: r1 = (1,2)
NOTICE: r1 = (1,2)
NOTICE: r2 = (1,2)
NOTICE: r1 = (1,2)
NOTICE: r2 = (1,4)
NOTICE: r1 = <NULL>
NOTICE: r2 = (1,4)
NOTICE: r1 = (7,11)
NOTICE: r2 = (7,11)
NOTICE: r3 = (1,2)
NOTICE: r4 = (1,2)
NOTICE: r3 = (1,2)
NOTICE: r4 = (4,2)
NOTICE: r1 = (1,2)
NOTICE: r4 = (1,2)
NOTICE: r4 = (1,3)
-- fields of named-type vars read as null if uninitialized
do $$
declare r1 two_int8s;
begin
raise notice 'r1 = %', r1;
raise notice 'r1.q1 = %', r1.q1;
raise notice 'r1.q2 = %', r1.q2;
raise notice 'r1 = %', r1;
end$$;
NOTICE: r1 = <NULL>
NOTICE: r1.q1 = <NULL>
NOTICE: r1.q2 = <NULL>
NOTICE: r1 = <NULL>
do $$
declare r1 two_int8s;
begin
raise notice 'r1.q1 = %', r1.q1;
raise notice 'r1.q2 = %', r1.q2;
raise notice 'r1 = %', r1;
raise notice 'r1.nosuchfield = %', r1.nosuchfield;
end$$;
NOTICE: r1.q1 = <NULL>
NOTICE: r1.q2 = <NULL>
NOTICE: r1 = <NULL>
ERROR: record "r1" has no field "nosuchfield"
CONTEXT: SQL statement "SELECT r1.nosuchfield"
PL/pgSQL function inline_code_block line 7 at RAISE
-- records, not so much
do $$
declare r1 record;
begin
raise notice 'r1 = %', r1;
raise notice 'r1.f1 = %', r1.f1;
raise notice 'r1.f2 = %', r1.f2;
raise notice 'r1 = %', r1;
end$$;
NOTICE: r1 = <NULL>
ERROR: record "r1" is not assigned yet
DETAIL: The tuple structure of a not-yet-assigned record is indeterminate.
CONTEXT: SQL statement "SELECT r1.f1"
PL/pgSQL function inline_code_block line 5 at RAISE
-- but OK if you assign first
do $$
declare r1 record;
begin
raise notice 'r1 = %', r1;
r1 := row(1,2);
raise notice 'r1.f1 = %', r1.f1;
raise notice 'r1.f2 = %', r1.f2;
raise notice 'r1 = %', r1;
raise notice 'r1.nosuchfield = %', r1.nosuchfield;
end$$;
NOTICE: r1 = <NULL>
NOTICE: r1.f1 = 1
NOTICE: r1.f2 = 2
NOTICE: r1 = (1,2)
ERROR: record "r1" has no field "nosuchfield"
CONTEXT: SQL statement "SELECT r1.nosuchfield"
PL/pgSQL function inline_code_block line 9 at RAISE
-- check repeated assignments to composite fields
create table some_table (id int, data text);
do $$
declare r some_table;
begin
r := (23, 'skidoo');
for i in 1 .. 10 loop
r.id := r.id + i;
r.data := r.data || ' ' || i;
end loop;
raise notice 'r = %', r;
end$$;
NOTICE: r = (78,"skidoo 1 2 3 4 5 6 7 8 9 10")
-- check behavior of function declared to return "record"
create function returnsrecord(int) returns record language plpgsql as
$$ begin return row($1,$1+1); end $$;
select returnsrecord(42);
returnsrecord
---------------
(42,43)
(1 row)
select * from returnsrecord(42) as r(x int, y int);
x | y
----+----
42 | 43
(1 row)
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (2) does not match expected column count (3).
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
select * from returnsrecord(42) as r(x int, y bigint); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 2.
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
-- same with an intermediate record variable
create or replace function returnsrecord(int) returns record language plpgsql as
$$ declare r record; begin r := row($1,$1+1); return r; end $$;
select returnsrecord(42);
returnsrecord
---------------
(42,43)
(1 row)
select * from returnsrecord(42) as r(x int, y int);
x | y
----+----
42 | 43
(1 row)
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (2) does not match expected column count (3).
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
select * from returnsrecord(42) as r(x int, y bigint); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 2.
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
-- should work the same with a missing column in the actual result value
create table has_hole(f1 int, f2 int, f3 int);
alter table has_hole drop column f2;
create or replace function returnsrecord(int) returns record language plpgsql as
$$ begin return row($1,$1+1)::has_hole; end $$;
select returnsrecord(42);
returnsrecord
---------------
(42,43)
(1 row)
select * from returnsrecord(42) as r(x int, y int);
x | y
----+----
42 | 43
(1 row)
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (2) does not match expected column count (3).
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
select * from returnsrecord(42) as r(x int, y bigint); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 2.
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
-- same with an intermediate record variable
create or replace function returnsrecord(int) returns record language plpgsql as
$$ declare r record; begin r := row($1,$1+1)::has_hole; return r; end $$;
select returnsrecord(42);
returnsrecord
---------------
(42,43)
(1 row)
select * from returnsrecord(42) as r(x int, y int);
x | y
----+----
42 | 43
(1 row)
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (2) does not match expected column count (3).
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
select * from returnsrecord(42) as r(x int, y bigint); -- fail
ERROR: returned record type does not match expected record type
DETAIL: Returned type integer does not match expected type bigint in column 2.
CONTEXT: PL/pgSQL function returnsrecord(integer) while casting return value to function's return type
-- check access to a field of an argument declared "record"
create function getf1(x record) returns int language plpgsql as
$$ begin return x.f1; end $$;
select getf1(1);
ERROR: function getf1(integer) does not exist
LINE 1: select getf1(1);
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
select getf1(row(1,2));
getf1
-------
1
(1 row)
select getf1(row(1,2)::two_int8s);
ERROR: record "x" has no field "f1"
CONTEXT: PL/pgSQL function getf1(record) line 1 at RETURN
select getf1(row(1,2));
getf1
-------
1
(1 row)
-- check behavior when assignment to FOR-loop variable requires coercion
do $$
declare r two_int8s;
begin
for r in select i, i+1 from generate_series(1,4) i
loop
raise notice 'r = %', r;
end loop;
end$$;
NOTICE: r = (1,2)
NOTICE: r = (2,3)
NOTICE: r = (3,4)
NOTICE: r = (4,5)
-- check behavior when returning setof composite
create function returnssetofholes() returns setof has_hole language plpgsql as
$$
declare r record;
h has_hole;
begin
return next h;
r := (1,2);
h := (3,4);
return next r;
return next h;
return next row(5,6);
return next row(7,8)::has_hole;
end$$;
select returnssetofholes();
returnssetofholes
-------------------
(,)
(1,2)
(3,4)
(5,6)
(7,8)
(5 rows)
create or replace function returnssetofholes() returns setof has_hole language plpgsql as
$$
declare r record;
begin
return next r; -- fails, not assigned yet
end$$;
select returnssetofholes();
ERROR: record "r" is not assigned yet
DETAIL: The tuple structure of a not-yet-assigned record is indeterminate.
CONTEXT: PL/pgSQL function returnssetofholes() line 4 at RETURN NEXT
create or replace function returnssetofholes() returns setof has_hole language plpgsql as
$$
begin
return next row(1,2,3); -- fails
end$$;
select returnssetofholes();
ERROR: returned record type does not match expected record type
DETAIL: Number of returned columns (3) does not match expected column count (2).
CONTEXT: PL/pgSQL function returnssetofholes() line 3 at RETURN NEXT
-- check behavior with changes of a named rowtype
create table mutable(f1 int, f2 text);
create function sillyaddone(int) returns int language plpgsql as
$$ declare r mutable; begin r.f1 := $1; return r.f1 + 1; end $$;
select sillyaddone(42);
sillyaddone
-------------
43
(1 row)
alter table mutable drop column f1;
alter table mutable add column f1 float8;
-- currently, this fails due to cached plan for "r.f1 + 1" expression
select sillyaddone(42);
ERROR: type of parameter 4 (double precision) does not match that when preparing the plan (integer)
CONTEXT: PL/pgSQL function sillyaddone(integer) line 1 at RETURN
\c -
-- but it's OK after a reconnect
select sillyaddone(42);
sillyaddone
-------------
43
(1 row)
alter table mutable drop column f1;
select sillyaddone(42); -- fail
ERROR: record "r" has no field "f1"
CONTEXT: PL/pgSQL function sillyaddone(integer) line 1 at assignment
create function getf3(x mutable) returns int language plpgsql as
$$ begin return x.f3; end $$;
select getf3(null::mutable); -- doesn't work yet
ERROR: record "x" has no field "f3"
CONTEXT: SQL statement "SELECT x.f3"
PL/pgSQL function getf3(mutable) line 1 at RETURN
alter table mutable add column f3 int;
select getf3(null::mutable); -- now it works
getf3
-------
(1 row)
alter table mutable drop column f3;
select getf3(null::mutable); -- fails again
ERROR: record "x" has no field "f3"
CONTEXT: PL/pgSQL function getf3(mutable) line 1 at RETURN
-- check access to system columns in a record variable
create function sillytrig() returns trigger language plpgsql as
$$begin
raise notice 'old.ctid = %', old.ctid;
raise notice 'old.tableoid = %', old.tableoid::regclass;
return new;
end$$;
create trigger mutable_trig before update on mutable for each row
execute procedure sillytrig();
insert into mutable values ('foo'), ('bar');
update mutable set f2 = f2 || ' baz';
NOTICE: old.ctid = (0,1)
NOTICE: old.tableoid = mutable
NOTICE: old.ctid = (0,2)
NOTICE: old.tableoid = mutable
table mutable;
f2
---------
foo baz
bar baz
(2 rows)
-- check returning a composite datum from a trigger
create or replace function sillytrig() returns trigger language plpgsql as
$$begin
return row(new.*);
end$$;
update mutable set f2 = f2 || ' baz';
table mutable;
f2
-------------
foo baz baz
bar baz baz
(2 rows)
create or replace function sillytrig() returns trigger language plpgsql as
$$declare r record;
begin
r := row(new.*);
return r;
end$$;
update mutable set f2 = f2 || ' baz';
table mutable;
f2
-----------------
foo baz baz baz
bar baz baz baz
(2 rows)
--
-- Domains of composite
--
create domain ordered_int8s as two_int8s check((value).q1 <= (value).q2);
create function read_ordered_int8s(p ordered_int8s) returns int8 as $$
begin return p.q1 + p.q2; end
$$ language plpgsql;
select read_ordered_int8s(row(1, 2));
read_ordered_int8s
--------------------
3
(1 row)
select read_ordered_int8s(row(2, 1)); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
create function build_ordered_int8s(i int8, j int8) returns ordered_int8s as $$
begin return row(i,j); end
$$ language plpgsql;
select build_ordered_int8s(1,2);
build_ordered_int8s
---------------------
(1,2)
(1 row)
select build_ordered_int8s(2,1); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function build_ordered_int8s(bigint,bigint) while casting return value to function's return type
create function build_ordered_int8s_2(i int8, j int8) returns ordered_int8s as $$
declare r record; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_2(1,2);
build_ordered_int8s_2
-----------------------
(1,2)
(1 row)
select build_ordered_int8s_2(2,1); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function build_ordered_int8s_2(bigint,bigint) while casting return value to function's return type
create function build_ordered_int8s_3(i int8, j int8) returns ordered_int8s as $$
declare r two_int8s; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_3(1,2);
build_ordered_int8s_3
-----------------------
(1,2)
(1 row)
select build_ordered_int8s_3(2,1); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function build_ordered_int8s_3(bigint,bigint) while casting return value to function's return type
create function build_ordered_int8s_4(i int8, j int8) returns ordered_int8s as $$
declare r ordered_int8s; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_4(1,2);
build_ordered_int8s_4
-----------------------
(1,2)
(1 row)
select build_ordered_int8s_4(2,1); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function build_ordered_int8s_4(bigint,bigint) line 2 at assignment
create function build_ordered_int8s_a(i int8, j int8) returns ordered_int8s[] as $$
begin return array[row(i,j), row(i,j+1)]; end
$$ language plpgsql;
select build_ordered_int8s_a(1,2);
build_ordered_int8s_a
-----------------------
{"(1,2)","(1,3)"}
(1 row)
select build_ordered_int8s_a(2,1); -- fail
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function build_ordered_int8s_a(bigint,bigint) while casting return value to function's return type
-- check field assignment
do $$
declare r ordered_int8s;
begin
r.q1 := null;
r.q2 := 43;
r.q1 := 42;
r.q2 := 41; -- fail
end$$;
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function inline_code_block line 7 at assignment
-- check whole-row assignment
do $$
declare r ordered_int8s;
begin
r := null;
r := row(null,null);
r := row(1,2);
r := row(2,1); -- fail
end$$;
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function inline_code_block line 7 at assignment
-- check assignment in for-loop
do $$
declare r ordered_int8s;
begin
for r in values (1,2),(3,4),(6,5) loop
raise notice 'r = %', r;
end loop;
end$$;
NOTICE: r = (1,2)
NOTICE: r = (3,4)
ERROR: value for domain ordered_int8s violates check constraint "ordered_int8s_check"
CONTEXT: PL/pgSQL function inline_code_block line 4 at FOR over SELECT rows
-- check behavior with toastable fields, too
create type two_texts as (f1 text, f2 text);
create domain ordered_texts as two_texts check((value).f1 <= (value).f2);
create table sometable (id int, a text, b text);
-- b should be compressed, but in-line
insert into sometable values (1, 'a', repeat('ffoob',1000));
-- this b should be out-of-line
insert into sometable values (2, 'a', repeat('ffoob',100000));
-- this pair should fail the domain check
insert into sometable values (3, 'z', repeat('ffoob',100000));
do $$
declare d ordered_texts;
begin
for d in select a, b from sometable loop
raise notice 'succeeded at "%"', d.f1;
end loop;
end$$;
NOTICE: succeeded at "a"
NOTICE: succeeded at "a"
ERROR: value for domain ordered_texts violates check constraint "ordered_texts_check"
CONTEXT: PL/pgSQL function inline_code_block line 4 at FOR over SELECT rows
do $$
declare r record; d ordered_texts;
begin
for r in select * from sometable loop
raise notice 'processing row %', r.id;
d := row(r.a, r.b);
end loop;
end$$;
NOTICE: processing row 1
NOTICE: processing row 2
NOTICE: processing row 3
ERROR: value for domain ordered_texts violates check constraint "ordered_texts_check"
CONTEXT: PL/pgSQL function inline_code_block line 6 at assignment
do $$
declare r record; d ordered_texts;
begin
for r in select * from sometable loop
raise notice 'processing row %', r.id;
d := null;
d.f1 := r.a;
d.f2 := r.b;
end loop;
end$$;
NOTICE: processing row 1
NOTICE: processing row 2
NOTICE: processing row 3
ERROR: value for domain ordered_texts violates check constraint "ordered_texts_check"
CONTEXT: PL/pgSQL function inline_code_block line 8 at assignment

View File

@ -32,6 +32,7 @@
#include "utils/regproc.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
#include "plpgsql.h"
@ -104,7 +105,6 @@ static Node *plpgsql_param_ref(ParseState *pstate, ParamRef *pref);
static Node *resolve_column_ref(ParseState *pstate, PLpgSQL_expr *expr,
ColumnRef *cref, bool error_if_no_field);
static Node *make_datum_param(PLpgSQL_expr *expr, int dno, int location);
static PLpgSQL_row *build_row_from_class(Oid classOid);
static PLpgSQL_row *build_row_from_vars(PLpgSQL_variable **vars, int numvars);
static PLpgSQL_type *build_datatype(HeapTuple typeTup, int32 typmod, Oid collation);
static void plpgsql_start_datums(void);
@ -425,8 +425,7 @@ do_compile(FunctionCallInfo fcinfo,
/* Disallow pseudotype argument */
/* (note we already replaced polymorphic types) */
/* (build_variable would do this, but wrong message) */
if (argdtype->ttype != PLPGSQL_TTYPE_SCALAR &&
argdtype->ttype != PLPGSQL_TTYPE_ROW)
if (argdtype->ttype == PLPGSQL_TTYPE_PSEUDO)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PL/pgSQL functions cannot accept type %s",
@ -447,8 +446,8 @@ do_compile(FunctionCallInfo fcinfo,
}
else
{
Assert(argvariable->dtype == PLPGSQL_DTYPE_ROW);
argitemtype = PLPGSQL_NSTYPE_ROW;
Assert(argvariable->dtype == PLPGSQL_DTYPE_REC);
argitemtype = PLPGSQL_NSTYPE_REC;
}
/* Remember arguments in appropriate arrays */
@ -557,29 +556,25 @@ do_compile(FunctionCallInfo fcinfo,
format_type_be(rettypeid))));
}
if (typeStruct->typrelid != InvalidOid ||
rettypeid == RECORDOID)
function->fn_retistuple = true;
else
{
function->fn_retbyval = typeStruct->typbyval;
function->fn_rettyplen = typeStruct->typlen;
function->fn_retistuple = type_is_rowtype(rettypeid);
function->fn_retbyval = typeStruct->typbyval;
function->fn_rettyplen = typeStruct->typlen;
/*
* install $0 reference, but only for polymorphic return
* types, and not when the return is specified through an
* output parameter.
*/
if (IsPolymorphicType(procStruct->prorettype) &&
num_out_args == 0)
{
(void) plpgsql_build_variable("$0", 0,
build_datatype(typeTup,
-1,
function->fn_input_collation),
true);
}
/*
* install $0 reference, but only for polymorphic return
* types, and not when the return is specified through an
* output parameter.
*/
if (IsPolymorphicType(procStruct->prorettype) &&
num_out_args == 0)
{
(void) plpgsql_build_variable("$0", 0,
build_datatype(typeTup,
-1,
function->fn_input_collation),
true);
}
ReleaseSysCache(typeTup);
}
break;
@ -599,11 +594,11 @@ do_compile(FunctionCallInfo fcinfo,
errhint("The arguments of the trigger can be accessed through TG_NARGS and TG_ARGV instead.")));
/* Add the record for referencing NEW ROW */
rec = plpgsql_build_record("new", 0, true);
rec = plpgsql_build_record("new", 0, RECORDOID, true);
function->new_varno = rec->dno;
/* Add the record for referencing OLD ROW */
rec = plpgsql_build_record("old", 0, true);
rec = plpgsql_build_record("old", 0, RECORDOID, true);
function->old_varno = rec->dno;
/* Add the variable tg_name */
@ -1240,19 +1235,22 @@ resolve_column_ref(ParseState *pstate, PLpgSQL_expr *expr,
if (nnames == nnames_field)
{
/* colname could be a field in this record */
PLpgSQL_rec *rec = (PLpgSQL_rec *) estate->datums[nse->itemno];
int i;
/* search for a datum referencing this field */
for (i = 0; i < estate->ndatums; i++)
i = rec->firstfield;
while (i >= 0)
{
PLpgSQL_recfield *fld = (PLpgSQL_recfield *) estate->datums[i];
if (fld->dtype == PLPGSQL_DTYPE_RECFIELD &&
fld->recparentno == nse->itemno &&
strcmp(fld->fieldname, colname) == 0)
Assert(fld->dtype == PLPGSQL_DTYPE_RECFIELD &&
fld->recparentno == nse->itemno);
if (strcmp(fld->fieldname, colname) == 0)
{
return make_datum_param(expr, i, cref->location);
}
i = fld->nextfield;
}
/*
@ -1270,34 +1268,6 @@ resolve_column_ref(ParseState *pstate, PLpgSQL_expr *expr,
parser_errposition(pstate, cref->location)));
}
break;
case PLPGSQL_NSTYPE_ROW:
if (nnames == nnames_wholerow)
return make_datum_param(expr, nse->itemno, cref->location);
if (nnames == nnames_field)
{
/* colname could be a field in this row */
PLpgSQL_row *row = (PLpgSQL_row *) estate->datums[nse->itemno];
int i;
for (i = 0; i < row->nfields; i++)
{
if (row->fieldnames[i] &&
strcmp(row->fieldnames[i], colname) == 0)
{
return make_datum_param(expr, row->varnos[i],
cref->location);
}
}
/* Not found, so throw error or return NULL */
if (error_if_no_field)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("record \"%s\" has no field \"%s\"",
(nnames_field == 1) ? name1 : name2,
colname),
parser_errposition(pstate, cref->location)));
}
break;
default:
elog(ERROR, "unrecognized plpgsql itemtype: %d", nse->itemtype);
}
@ -1385,7 +1355,6 @@ plpgsql_parse_word(char *word1, const char *yytxt,
switch (ns->itemtype)
{
case PLPGSQL_NSTYPE_VAR:
case PLPGSQL_NSTYPE_ROW:
case PLPGSQL_NSTYPE_REC:
wdatum->datum = plpgsql_Datums[ns->itemno];
wdatum->ident = word1;
@ -1461,14 +1430,11 @@ plpgsql_parse_dblword(char *word1, char *word2,
* datum whether it is or not --- any error will be
* detected later.
*/
PLpgSQL_rec *rec;
PLpgSQL_recfield *new;
new = palloc(sizeof(PLpgSQL_recfield));
new->dtype = PLPGSQL_DTYPE_RECFIELD;
new->fieldname = pstrdup(word2);
new->recparentno = ns->itemno;
plpgsql_adddatum((PLpgSQL_datum *) new);
rec = (PLpgSQL_rec *) (plpgsql_Datums[ns->itemno]);
new = plpgsql_build_recfield(rec, word2);
wdatum->datum = (PLpgSQL_datum *) new;
}
@ -1482,43 +1448,6 @@ plpgsql_parse_dblword(char *word1, char *word2,
wdatum->idents = idents;
return true;
case PLPGSQL_NSTYPE_ROW:
if (nnames == 1)
{
/*
* First word is a row name, so second word could be a
* field in this row. Again, no error now if it
* isn't.
*/
PLpgSQL_row *row;
int i;
row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
for (i = 0; i < row->nfields; i++)
{
if (row->fieldnames[i] &&
strcmp(row->fieldnames[i], word2) == 0)
{
wdatum->datum = plpgsql_Datums[row->varnos[i]];
wdatum->ident = NULL;
wdatum->quoted = false; /* not used */
wdatum->idents = idents;
return true;
}
}
/* fall through to return CWORD */
}
else
{
/* Block-qualified reference to row variable. */
wdatum->datum = plpgsql_Datums[ns->itemno];
wdatum->ident = NULL;
wdatum->quoted = false; /* not used */
wdatum->idents = idents;
return true;
}
break;
default:
break;
}
@ -1572,14 +1501,11 @@ plpgsql_parse_tripword(char *word1, char *word2, char *word3,
* words 1/2 are a record name, so third word could be
* a field in this record.
*/
PLpgSQL_rec *rec;
PLpgSQL_recfield *new;
new = palloc(sizeof(PLpgSQL_recfield));
new->dtype = PLPGSQL_DTYPE_RECFIELD;
new->fieldname = pstrdup(word3);
new->recparentno = ns->itemno;
plpgsql_adddatum((PLpgSQL_datum *) new);
rec = (PLpgSQL_rec *) (plpgsql_Datums[ns->itemno]);
new = plpgsql_build_recfield(rec, word3);
wdatum->datum = (PLpgSQL_datum *) new;
wdatum->ident = NULL;
@ -1588,32 +1514,6 @@ plpgsql_parse_tripword(char *word1, char *word2, char *word3,
return true;
}
case PLPGSQL_NSTYPE_ROW:
{
/*
* words 1/2 are a row name, so third word could be a
* field in this row.
*/
PLpgSQL_row *row;
int i;
row = (PLpgSQL_row *) (plpgsql_Datums[ns->itemno]);
for (i = 0; i < row->nfields; i++)
{
if (row->fieldnames[i] &&
strcmp(row->fieldnames[i], word3) == 0)
{
wdatum->datum = plpgsql_Datums[row->varnos[i]];
wdatum->ident = NULL;
wdatum->quoted = false; /* not used */
wdatum->idents = idents;
return true;
}
}
/* fall through to return CWORD */
break;
}
default:
break;
}
@ -1864,8 +1764,8 @@ plpgsql_parse_cwordrowtype(List *idents)
* plpgsql_build_variable - build a datum-array entry of a given
* datatype
*
* The returned struct may be a PLpgSQL_var, PLpgSQL_row, or
* PLpgSQL_rec depending on the given datatype, and is allocated via
* The returned struct may be a PLpgSQL_var or PLpgSQL_rec
* depending on the given datatype, and is allocated via
* palloc. The struct is automatically added to the current datum
* array, and optionally to the current namespace.
*/
@ -1902,31 +1802,13 @@ plpgsql_build_variable(const char *refname, int lineno, PLpgSQL_type *dtype,
result = (PLpgSQL_variable *) var;
break;
}
case PLPGSQL_TTYPE_ROW:
{
/* Composite type -- build a row variable */
PLpgSQL_row *row;
row = build_row_from_class(dtype->typrelid);
row->dtype = PLPGSQL_DTYPE_ROW;
row->refname = pstrdup(refname);
row->lineno = lineno;
plpgsql_adddatum((PLpgSQL_datum *) row);
if (add2namespace)
plpgsql_ns_additem(PLPGSQL_NSTYPE_ROW,
row->dno,
refname);
result = (PLpgSQL_variable *) row;
break;
}
case PLPGSQL_TTYPE_REC:
{
/* "record" type -- build a record variable */
/* Composite type -- build a record variable */
PLpgSQL_rec *rec;
rec = plpgsql_build_record(refname, lineno, add2namespace);
rec = plpgsql_build_record(refname, lineno, dtype->typoid,
add2namespace);
result = (PLpgSQL_variable *) rec;
break;
}
@ -1950,7 +1832,8 @@ plpgsql_build_variable(const char *refname, int lineno, PLpgSQL_type *dtype,
* Build empty named record variable, and optionally add it to namespace
*/
PLpgSQL_rec *
plpgsql_build_record(const char *refname, int lineno, bool add2namespace)
plpgsql_build_record(const char *refname, int lineno, Oid rectypeid,
bool add2namespace)
{
PLpgSQL_rec *rec;
@ -1958,10 +1841,9 @@ plpgsql_build_record(const char *refname, int lineno, bool add2namespace)
rec->dtype = PLPGSQL_DTYPE_REC;
rec->refname = pstrdup(refname);
rec->lineno = lineno;
rec->tup = NULL;
rec->tupdesc = NULL;
rec->freetup = false;
rec->freetupdesc = false;
rec->rectypeid = rectypeid;
rec->firstfield = -1;
rec->erh = NULL;
plpgsql_adddatum((PLpgSQL_datum *) rec);
if (add2namespace)
plpgsql_ns_additem(PLPGSQL_NSTYPE_REC, rec->dno, rec->refname);
@ -1969,104 +1851,9 @@ plpgsql_build_record(const char *refname, int lineno, bool add2namespace)
return rec;
}
/*
* Build a row-variable data structure given the pg_class OID.
*/
static PLpgSQL_row *
build_row_from_class(Oid classOid)
{
PLpgSQL_row *row;
Relation rel;
Form_pg_class classStruct;
const char *relname;
int i;
/*
* Open the relation to get info.
*/
rel = relation_open(classOid, AccessShareLock);
classStruct = RelationGetForm(rel);
relname = RelationGetRelationName(rel);
/*
* Accept relation, sequence, view, materialized view, composite type, or
* foreign table.
*/
if (classStruct->relkind != RELKIND_RELATION &&
classStruct->relkind != RELKIND_SEQUENCE &&
classStruct->relkind != RELKIND_VIEW &&
classStruct->relkind != RELKIND_MATVIEW &&
classStruct->relkind != RELKIND_COMPOSITE_TYPE &&
classStruct->relkind != RELKIND_FOREIGN_TABLE &&
classStruct->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("relation \"%s\" is not a table", relname)));
/*
* Create a row datum entry and all the required variables that it will
* point to.
*/
row = palloc0(sizeof(PLpgSQL_row));
row->dtype = PLPGSQL_DTYPE_ROW;
row->rowtupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
row->nfields = classStruct->relnatts;
row->fieldnames = palloc(sizeof(char *) * row->nfields);
row->varnos = palloc(sizeof(int) * row->nfields);
for (i = 0; i < row->nfields; i++)
{
Form_pg_attribute attrStruct;
/*
* Get the attribute and check for dropped column
*/
attrStruct = TupleDescAttr(row->rowtupdesc, i);
if (!attrStruct->attisdropped)
{
char *attname;
char refname[(NAMEDATALEN * 2) + 100];
PLpgSQL_variable *var;
attname = NameStr(attrStruct->attname);
snprintf(refname, sizeof(refname), "%s.%s", relname, attname);
/*
* Create the internal variable for the field
*
* We know if the table definitions contain a default value or if
* the field is declared in the table as NOT NULL. But it's
* possible to create a table field as NOT NULL without a default
* value and that would lead to problems later when initializing
* the variables due to entering a block at execution time. Thus
* we ignore this information for now.
*/
var = plpgsql_build_variable(refname, 0,
plpgsql_build_datatype(attrStruct->atttypid,
attrStruct->atttypmod,
attrStruct->attcollation),
false);
/* Add the variable to the row */
row->fieldnames[i] = attname;
row->varnos[i] = var->dno;
}
else
{
/* Leave a hole in the row structure for the dropped col */
row->fieldnames[i] = NULL;
row->varnos[i] = -1;
}
}
relation_close(rel, AccessShareLock);
return row;
}
/*
* Build a row-variable data structure given the component variables.
* Include a rowtupdesc, since we will need to materialize the row result.
*/
static PLpgSQL_row *
build_row_from_vars(PLpgSQL_variable **vars, int numvars)
@ -2084,9 +1871,9 @@ build_row_from_vars(PLpgSQL_variable **vars, int numvars)
for (i = 0; i < numvars; i++)
{
PLpgSQL_variable *var = vars[i];
Oid typoid = RECORDOID;
int32 typmod = -1;
Oid typcoll = InvalidOid;
Oid typoid;
int32 typmod;
Oid typcoll;
switch (var->dtype)
{
@ -2097,19 +1884,17 @@ build_row_from_vars(PLpgSQL_variable **vars, int numvars)
break;
case PLPGSQL_DTYPE_REC:
break;
case PLPGSQL_DTYPE_ROW:
if (((PLpgSQL_row *) var)->rowtupdesc)
{
typoid = ((PLpgSQL_row *) var)->rowtupdesc->tdtypeid;
typmod = ((PLpgSQL_row *) var)->rowtupdesc->tdtypmod;
/* composite types have no collation */
}
typoid = ((PLpgSQL_rec *) var)->rectypeid;
typmod = -1; /* don't know typmod, if it's used at all */
typcoll = InvalidOid; /* composite types have no collation */
break;
default:
elog(ERROR, "unrecognized dtype: %d", var->dtype);
typoid = InvalidOid; /* keep compiler quiet */
typmod = 0;
typcoll = InvalidOid;
break;
}
row->fieldnames[i] = var->refname;
@ -2125,6 +1910,46 @@ build_row_from_vars(PLpgSQL_variable **vars, int numvars)
return row;
}
/*
* Build a RECFIELD datum for the named field of the specified record variable
*
* If there's already such a datum, just return it; we don't need duplicates.
*/
PLpgSQL_recfield *
plpgsql_build_recfield(PLpgSQL_rec *rec, const char *fldname)
{
PLpgSQL_recfield *recfield;
int i;
/* search for an existing datum referencing this field */
i = rec->firstfield;
while (i >= 0)
{
PLpgSQL_recfield *fld = (PLpgSQL_recfield *) plpgsql_Datums[i];
Assert(fld->dtype == PLPGSQL_DTYPE_RECFIELD &&
fld->recparentno == rec->dno);
if (strcmp(fld->fieldname, fldname) == 0)
return fld;
i = fld->nextfield;
}
/* nope, so make a new one */
recfield = palloc0(sizeof(PLpgSQL_recfield));
recfield->dtype = PLPGSQL_DTYPE_RECFIELD;
recfield->fieldname = pstrdup(fldname);
recfield->recparentno = rec->dno;
recfield->rectupledescid = INVALID_TUPLEDESC_IDENTIFIER;
plpgsql_adddatum((PLpgSQL_datum *) recfield);
/* now we can link it into the parent's chain */
recfield->nextfield = rec->firstfield;
rec->firstfield = recfield->dno;
return recfield;
}
/*
* plpgsql_build_datatype
* Build PLpgSQL_type struct given type OID, typmod, and collation.
@ -2171,14 +1996,18 @@ build_datatype(HeapTuple typeTup, int32 typmod, Oid collation)
switch (typeStruct->typtype)
{
case TYPTYPE_BASE:
case TYPTYPE_DOMAIN:
case TYPTYPE_ENUM:
case TYPTYPE_RANGE:
typ->ttype = PLPGSQL_TTYPE_SCALAR;
break;
case TYPTYPE_COMPOSITE:
Assert(OidIsValid(typeStruct->typrelid));
typ->ttype = PLPGSQL_TTYPE_ROW;
typ->ttype = PLPGSQL_TTYPE_REC;
break;
case TYPTYPE_DOMAIN:
if (type_is_rowtype(typeStruct->typbasetype))
typ->ttype = PLPGSQL_TTYPE_REC;
else
typ->ttype = PLPGSQL_TTYPE_SCALAR;
break;
case TYPTYPE_PSEUDO:
if (typ->typoid == RECORDOID)
@ -2194,7 +2023,6 @@ build_datatype(HeapTuple typeTup, int32 typmod, Oid collation)
typ->typlen = typeStruct->typlen;
typ->typbyval = typeStruct->typbyval;
typ->typtype = typeStruct->typtype;
typ->typrelid = typeStruct->typrelid;
typ->collation = typeStruct->typcollation;
if (OidIsValid(collation) && OidIsValid(typ->collation))
typ->collation = collation;

File diff suppressed because it is too large Load Diff

View File

@ -1618,15 +1618,16 @@ plpgsql_dumptree(PLpgSQL_function *func)
printf("ROW %-16s fields", row->refname);
for (i = 0; i < row->nfields; i++)
{
if (row->fieldnames[i])
printf(" %s=var %d", row->fieldnames[i],
row->varnos[i]);
printf(" %s=var %d", row->fieldnames[i],
row->varnos[i]);
}
printf("\n");
}
break;
case PLPGSQL_DTYPE_REC:
printf("REC %s\n", ((PLpgSQL_rec *) d)->refname);
printf("REC %-16s typoid %u\n",
((PLpgSQL_rec *) d)->refname,
((PLpgSQL_rec *) d)->rectypeid);
break;
case PLPGSQL_DTYPE_RECFIELD:
printf("RECFIELD %-16s of REC %d\n",

View File

@ -512,7 +512,7 @@ decl_statement : decl_varname decl_const decl_datatype decl_collate decl_notnull
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("row or record variable cannot be CONSTANT"),
errmsg("record variable cannot be CONSTANT"),
parser_errposition(@2)));
}
if ($5)
@ -522,7 +522,7 @@ decl_statement : decl_varname decl_const decl_datatype decl_collate decl_notnull
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("row or record variable cannot be NOT NULL"),
errmsg("record variable cannot be NOT NULL"),
parser_errposition(@4)));
}
@ -533,7 +533,7 @@ decl_statement : decl_varname decl_const decl_datatype decl_collate decl_notnull
else
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("default value for row or record variable is not supported"),
errmsg("default value for record variable is not supported"),
parser_errposition(@5)));
}
}
@ -1333,7 +1333,7 @@ for_control : for_variable K_IN
{
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("loop variable of loop over rows must be a record or row variable or list of scalar variables"),
errmsg("loop variable of loop over rows must be a record variable or list of scalar variables"),
parser_errposition(@1)));
}
new->query = expr;
@ -1386,6 +1386,7 @@ for_control : for_variable K_IN
new->var = (PLpgSQL_variable *)
plpgsql_build_record($1.name,
$1.lineno,
RECORDOID,
true);
$$ = (PLpgSQL_stmt *) new;
@ -1524,7 +1525,7 @@ for_control : for_variable K_IN
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("loop variable of loop over rows must be a record or row variable or list of scalar variables"),
errmsg("loop variable of loop over rows must be a record variable or list of scalar variables"),
parser_errposition(@1)));
}
@ -3328,7 +3329,7 @@ check_assignable(PLpgSQL_datum *datum, int location)
parser_errposition(location)));
break;
case PLPGSQL_DTYPE_ROW:
/* always assignable? */
/* always assignable? Shouldn't we check member vars? */
break;
case PLPGSQL_DTYPE_REC:
/* always assignable? What about NEW/OLD? */
@ -3385,7 +3386,7 @@ read_into_target(PLpgSQL_variable **target, bool *strict)
if ((tok = yylex()) == ',')
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("record or row variable cannot be part of multiple-item INTO list"),
errmsg("record variable cannot be part of multiple-item INTO list"),
parser_errposition(yylloc)));
plpgsql_push_back_token(tok);
}

View File

@ -443,14 +443,15 @@ plpgsql_validator(PG_FUNCTION_ARGS)
}
/* Disallow pseudotypes in arguments (either IN or OUT) */
/* except for polymorphic */
/* except for RECORD and polymorphic */
numargs = get_func_arg_info(tuple,
&argtypes, &argnames, &argmodes);
for (i = 0; i < numargs; i++)
{
if (get_typtype(argtypes[i]) == TYPTYPE_PSEUDO)
{
if (!IsPolymorphicType(argtypes[i]))
if (argtypes[i] != RECORDOID &&
!IsPolymorphicType(argtypes[i]))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("PL/pgSQL functions cannot accept type %s",

View File

@ -20,6 +20,8 @@
#include "commands/event_trigger.h"
#include "commands/trigger.h"
#include "executor/spi.h"
#include "utils/expandedrecord.h"
/**********************************************************************
* Definitions
@ -37,10 +39,9 @@
*/
typedef enum PLpgSQL_nsitem_type
{
PLPGSQL_NSTYPE_LABEL,
PLPGSQL_NSTYPE_VAR,
PLPGSQL_NSTYPE_ROW,
PLPGSQL_NSTYPE_REC
PLPGSQL_NSTYPE_LABEL, /* block label */
PLPGSQL_NSTYPE_VAR, /* scalar variable */
PLPGSQL_NSTYPE_REC /* composite variable */
} PLpgSQL_nsitem_type;
/*
@ -72,9 +73,8 @@ typedef enum PLpgSQL_datum_type
typedef enum PLpgSQL_type_type
{
PLPGSQL_TTYPE_SCALAR, /* scalar types and domains */
PLPGSQL_TTYPE_ROW, /* composite types */
PLPGSQL_TTYPE_REC, /* RECORD pseudotype */
PLPGSQL_TTYPE_PSEUDO /* other pseudotypes */
PLPGSQL_TTYPE_REC, /* composite types, including RECORD */
PLPGSQL_TTYPE_PSEUDO /* pseudotypes */
} PLpgSQL_type_type;
/*
@ -183,7 +183,6 @@ typedef struct PLpgSQL_type
int16 typlen; /* stuff copied from its pg_type entry */
bool typbyval;
char typtype;
Oid typrelid;
Oid collation; /* from pg_type, but can be overridden */
bool typisarray; /* is "true" array, or domain over one */
int32 atttypmod; /* typmod (taken from someplace else) */
@ -274,7 +273,12 @@ typedef struct PLpgSQL_var
} PLpgSQL_var;
/*
* Row variable
* Row variable - this represents one or more variables that are listed in an
* INTO clause, FOR-loop targetlist, cursor argument list, etc. We also use
* a row to represent a function's OUT parameters when there's more than one.
*
* Note that there's no way to name the row as such from PL/pgSQL code,
* so many functions don't need to support these.
*/
typedef struct PLpgSQL_row
{
@ -283,21 +287,20 @@ typedef struct PLpgSQL_row
char *refname;
int lineno;
/* Note: TupleDesc is only set up for named rowtypes, else it is NULL. */
/*
* rowtupdesc is only set up if we might need to convert the row into a
* composite datum, which currently only happens for OUT parameters.
* Otherwise it is NULL.
*/
TupleDesc rowtupdesc;
/*
* Note: if the underlying rowtype contains a dropped column, the
* corresponding fieldnames[] entry will be NULL, and there is no
* corresponding var (varnos[] will be -1).
*/
int nfields;
char **fieldnames;
int *varnos;
} PLpgSQL_row;
/*
* Record variable (non-fixed structure)
* Record variable (any composite type, including RECORD)
*/
typedef struct PLpgSQL_rec
{
@ -305,11 +308,11 @@ typedef struct PLpgSQL_rec
int dno;
char *refname;
int lineno;
HeapTuple tup;
TupleDesc tupdesc;
bool freetup;
bool freetupdesc;
Oid rectypeid; /* declared type of variable */
/* RECFIELDs for this record are chained together for easy access */
int firstfield; /* dno of first RECFIELD, or -1 if none */
/* We always store record variables as "expanded" records */
ExpandedRecordHeader *erh;
} PLpgSQL_rec;
/*
@ -319,8 +322,12 @@ typedef struct PLpgSQL_recfield
{
PLpgSQL_datum_type dtype;
int dno;
char *fieldname;
char *fieldname; /* name of field */
int recparentno; /* dno of parent record */
int nextfield; /* dno of next child, or -1 if none */
uint64 rectupledescid; /* record's tupledesc ID as of last lookup */
ExpandedRecordFieldInfo finfo; /* field's attnum and type info */
/* if rectupledescid == INVALID_TUPLEDESC_IDENTIFIER, finfo isn't valid */
} PLpgSQL_recfield;
/*
@ -903,12 +910,12 @@ typedef struct PLpgSQL_execstate
bool readonly_func;
TupleDesc rettupdesc;
char *exitlabel; /* the "target" label of the current EXIT or
* CONTINUE stmt, if any */
ErrorData *cur_error; /* current exception handler's error */
Tuplestorestate *tuple_store; /* SRFs accumulate results here */
TupleDesc tuple_store_desc; /* descriptor for tuples in tuple_store */
MemoryContext tuple_store_cxt;
ResourceOwner tuple_store_owner;
ReturnSetInfo *rsi;
@ -917,6 +924,8 @@ typedef struct PLpgSQL_execstate
int found_varno;
int ndatums;
PLpgSQL_datum **datums;
/* context containing variable values (same as func's SPI_proc context) */
MemoryContext datum_context;
/*
* paramLI is what we use to pass local variable values to the executor.
@ -1088,7 +1097,9 @@ extern PLpgSQL_variable *plpgsql_build_variable(const char *refname, int lineno,
PLpgSQL_type *dtype,
bool add2namespace);
extern PLpgSQL_rec *plpgsql_build_record(const char *refname, int lineno,
bool add2namespace);
Oid rectypeid, bool add2namespace);
extern PLpgSQL_recfield *plpgsql_build_recfield(PLpgSQL_rec *rec,
const char *fldname);
extern int plpgsql_recognize_err_condition(const char *condname,
bool allow_sqlstate);
extern PLpgSQL_condition *plpgsql_parse_err_condition(char *condname);

View File

@ -0,0 +1,441 @@
--
-- Tests for PL/pgSQL handling of composite (record) variables
--
create type two_int4s as (f1 int4, f2 int4);
create type two_int8s as (q1 int8, q2 int8);
-- base-case return of a composite type
create function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1,1)::two_int8s; end $$;
select retc(42);
-- ok to return a matching record type
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1::int8, 1::int8); end $$;
select retc(42);
-- we don't currently support implicit casting
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1,1); end $$;
select retc(42);
-- nor extra columns
create or replace function retc(int) returns two_int8s language plpgsql as
$$ begin return row($1::int8, 1::int8, 42); end $$;
select retc(42);
-- same cases with an intermediate "record" variable
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1::int8, 1::int8); return r; end $$;
select retc(42);
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1,1); return r; end $$;
select retc(42);
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r record; begin r := row($1::int8, 1::int8, 42); return r; end $$;
select retc(42);
-- but, for mostly historical reasons, we do convert when assigning
-- to a named-composite-type variable
create or replace function retc(int) returns two_int8s language plpgsql as
$$ declare r two_int8s; begin r := row($1::int8, 1::int8, 42); return r; end $$;
select retc(42);
do $$ declare c two_int8s;
begin c := row(1,2); raise notice 'c = %', c; end$$;
do $$ declare c two_int8s;
begin for c in select 1,2 loop raise notice 'c = %', c; end loop; end$$;
do $$ declare c4 two_int4s; c8 two_int8s;
begin
c8 := row(1,2);
c4 := c8;
c8 := c4;
raise notice 'c4 = %', c4;
raise notice 'c8 = %', c8;
end$$;
-- check passing composite result to another function
create function getq1(two_int8s) returns int8 language plpgsql as $$
declare r two_int8s; begin r := $1; return r.q1; end $$;
select getq1(retc(344));
select getq1(row(1,2));
do $$
declare r1 two_int8s; r2 record; x int8;
begin
r1 := retc(345);
perform getq1(r1);
x := getq1(r1);
raise notice 'x = %', x;
r2 := retc(346);
perform getq1(r2);
x := getq1(r2);
raise notice 'x = %', x;
end$$;
-- check assignments of composites
do $$
declare r1 two_int8s; r2 two_int8s; r3 record; r4 record;
begin
r1 := row(1,2);
raise notice 'r1 = %', r1;
r1 := r1; -- shouldn't do anything
raise notice 'r1 = %', r1;
r2 := r1;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r2.q2 = r1.q1 + 3; -- check that r2 has distinct storage
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r1 := null;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r1 := row(7,11)::two_int8s;
r2 := r1;
raise notice 'r1 = %', r1;
raise notice 'r2 = %', r2;
r3 := row(1,2);
r4 := r3;
raise notice 'r3 = %', r3;
raise notice 'r4 = %', r4;
r4.f1 := r4.f1 + 3; -- check that r4 has distinct storage
raise notice 'r3 = %', r3;
raise notice 'r4 = %', r4;
r1 := r3;
raise notice 'r1 = %', r1;
r4 := r1;
raise notice 'r4 = %', r4;
r4.q2 := r4.q2 + 1; -- r4's field names have changed
raise notice 'r4 = %', r4;
end$$;
-- fields of named-type vars read as null if uninitialized
do $$
declare r1 two_int8s;
begin
raise notice 'r1 = %', r1;
raise notice 'r1.q1 = %', r1.q1;
raise notice 'r1.q2 = %', r1.q2;
raise notice 'r1 = %', r1;
end$$;
do $$
declare r1 two_int8s;
begin
raise notice 'r1.q1 = %', r1.q1;
raise notice 'r1.q2 = %', r1.q2;
raise notice 'r1 = %', r1;
raise notice 'r1.nosuchfield = %', r1.nosuchfield;
end$$;
-- records, not so much
do $$
declare r1 record;
begin
raise notice 'r1 = %', r1;
raise notice 'r1.f1 = %', r1.f1;
raise notice 'r1.f2 = %', r1.f2;
raise notice 'r1 = %', r1;
end$$;
-- but OK if you assign first
do $$
declare r1 record;
begin
raise notice 'r1 = %', r1;
r1 := row(1,2);
raise notice 'r1.f1 = %', r1.f1;
raise notice 'r1.f2 = %', r1.f2;
raise notice 'r1 = %', r1;
raise notice 'r1.nosuchfield = %', r1.nosuchfield;
end$$;
-- check repeated assignments to composite fields
create table some_table (id int, data text);
do $$
declare r some_table;
begin
r := (23, 'skidoo');
for i in 1 .. 10 loop
r.id := r.id + i;
r.data := r.data || ' ' || i;
end loop;
raise notice 'r = %', r;
end$$;
-- check behavior of function declared to return "record"
create function returnsrecord(int) returns record language plpgsql as
$$ begin return row($1,$1+1); end $$;
select returnsrecord(42);
select * from returnsrecord(42) as r(x int, y int);
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
select * from returnsrecord(42) as r(x int, y bigint); -- fail
-- same with an intermediate record variable
create or replace function returnsrecord(int) returns record language plpgsql as
$$ declare r record; begin r := row($1,$1+1); return r; end $$;
select returnsrecord(42);
select * from returnsrecord(42) as r(x int, y int);
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
select * from returnsrecord(42) as r(x int, y bigint); -- fail
-- should work the same with a missing column in the actual result value
create table has_hole(f1 int, f2 int, f3 int);
alter table has_hole drop column f2;
create or replace function returnsrecord(int) returns record language plpgsql as
$$ begin return row($1,$1+1)::has_hole; end $$;
select returnsrecord(42);
select * from returnsrecord(42) as r(x int, y int);
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
select * from returnsrecord(42) as r(x int, y bigint); -- fail
-- same with an intermediate record variable
create or replace function returnsrecord(int) returns record language plpgsql as
$$ declare r record; begin r := row($1,$1+1)::has_hole; return r; end $$;
select returnsrecord(42);
select * from returnsrecord(42) as r(x int, y int);
select * from returnsrecord(42) as r(x int, y int, z int); -- fail
select * from returnsrecord(42) as r(x int, y bigint); -- fail
-- check access to a field of an argument declared "record"
create function getf1(x record) returns int language plpgsql as
$$ begin return x.f1; end $$;
select getf1(1);
select getf1(row(1,2));
select getf1(row(1,2)::two_int8s);
select getf1(row(1,2));
-- check behavior when assignment to FOR-loop variable requires coercion
do $$
declare r two_int8s;
begin
for r in select i, i+1 from generate_series(1,4) i
loop
raise notice 'r = %', r;
end loop;
end$$;
-- check behavior when returning setof composite
create function returnssetofholes() returns setof has_hole language plpgsql as
$$
declare r record;
h has_hole;
begin
return next h;
r := (1,2);
h := (3,4);
return next r;
return next h;
return next row(5,6);
return next row(7,8)::has_hole;
end$$;
select returnssetofholes();
create or replace function returnssetofholes() returns setof has_hole language plpgsql as
$$
declare r record;
begin
return next r; -- fails, not assigned yet
end$$;
select returnssetofholes();
create or replace function returnssetofholes() returns setof has_hole language plpgsql as
$$
begin
return next row(1,2,3); -- fails
end$$;
select returnssetofholes();
-- check behavior with changes of a named rowtype
create table mutable(f1 int, f2 text);
create function sillyaddone(int) returns int language plpgsql as
$$ declare r mutable; begin r.f1 := $1; return r.f1 + 1; end $$;
select sillyaddone(42);
alter table mutable drop column f1;
alter table mutable add column f1 float8;
-- currently, this fails due to cached plan for "r.f1 + 1" expression
select sillyaddone(42);
\c -
-- but it's OK after a reconnect
select sillyaddone(42);
alter table mutable drop column f1;
select sillyaddone(42); -- fail
create function getf3(x mutable) returns int language plpgsql as
$$ begin return x.f3; end $$;
select getf3(null::mutable); -- doesn't work yet
alter table mutable add column f3 int;
select getf3(null::mutable); -- now it works
alter table mutable drop column f3;
select getf3(null::mutable); -- fails again
-- check access to system columns in a record variable
create function sillytrig() returns trigger language plpgsql as
$$begin
raise notice 'old.ctid = %', old.ctid;
raise notice 'old.tableoid = %', old.tableoid::regclass;
return new;
end$$;
create trigger mutable_trig before update on mutable for each row
execute procedure sillytrig();
insert into mutable values ('foo'), ('bar');
update mutable set f2 = f2 || ' baz';
table mutable;
-- check returning a composite datum from a trigger
create or replace function sillytrig() returns trigger language plpgsql as
$$begin
return row(new.*);
end$$;
update mutable set f2 = f2 || ' baz';
table mutable;
create or replace function sillytrig() returns trigger language plpgsql as
$$declare r record;
begin
r := row(new.*);
return r;
end$$;
update mutable set f2 = f2 || ' baz';
table mutable;
--
-- Domains of composite
--
create domain ordered_int8s as two_int8s check((value).q1 <= (value).q2);
create function read_ordered_int8s(p ordered_int8s) returns int8 as $$
begin return p.q1 + p.q2; end
$$ language plpgsql;
select read_ordered_int8s(row(1, 2));
select read_ordered_int8s(row(2, 1)); -- fail
create function build_ordered_int8s(i int8, j int8) returns ordered_int8s as $$
begin return row(i,j); end
$$ language plpgsql;
select build_ordered_int8s(1,2);
select build_ordered_int8s(2,1); -- fail
create function build_ordered_int8s_2(i int8, j int8) returns ordered_int8s as $$
declare r record; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_2(1,2);
select build_ordered_int8s_2(2,1); -- fail
create function build_ordered_int8s_3(i int8, j int8) returns ordered_int8s as $$
declare r two_int8s; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_3(1,2);
select build_ordered_int8s_3(2,1); -- fail
create function build_ordered_int8s_4(i int8, j int8) returns ordered_int8s as $$
declare r ordered_int8s; begin r := row(i,j); return r; end
$$ language plpgsql;
select build_ordered_int8s_4(1,2);
select build_ordered_int8s_4(2,1); -- fail
create function build_ordered_int8s_a(i int8, j int8) returns ordered_int8s[] as $$
begin return array[row(i,j), row(i,j+1)]; end
$$ language plpgsql;
select build_ordered_int8s_a(1,2);
select build_ordered_int8s_a(2,1); -- fail
-- check field assignment
do $$
declare r ordered_int8s;
begin
r.q1 := null;
r.q2 := 43;
r.q1 := 42;
r.q2 := 41; -- fail
end$$;
-- check whole-row assignment
do $$
declare r ordered_int8s;
begin
r := null;
r := row(null,null);
r := row(1,2);
r := row(2,1); -- fail
end$$;
-- check assignment in for-loop
do $$
declare r ordered_int8s;
begin
for r in values (1,2),(3,4),(6,5) loop
raise notice 'r = %', r;
end loop;
end$$;
-- check behavior with toastable fields, too
create type two_texts as (f1 text, f2 text);
create domain ordered_texts as two_texts check((value).f1 <= (value).f2);
create table sometable (id int, a text, b text);
-- b should be compressed, but in-line
insert into sometable values (1, 'a', repeat('ffoob',1000));
-- this b should be out-of-line
insert into sometable values (2, 'a', repeat('ffoob',100000));
-- this pair should fail the domain check
insert into sometable values (3, 'z', repeat('ffoob',100000));
do $$
declare d ordered_texts;
begin
for d in select a, b from sometable loop
raise notice 'succeeded at "%"', d.f1;
end loop;
end$$;
do $$
declare r record; d ordered_texts;
begin
for r in select * from sometable loop
raise notice 'processing row %', r.id;
d := row(r.a, r.b);
end loop;
end$$;
do $$
declare r record; d ordered_texts;
begin
for r in select * from sometable loop
raise notice 'processing row %', r.id;
d := null;
d.f1 := r.a;
d.f2 := r.b;
end loop;
end$$;

View File

@ -384,7 +384,7 @@ PLy_output_setup_func(PLyObToDatum *arg, MemoryContext arg_mcxt,
/* We'll set up the per-field data later */
arg->u.tuple.recdesc = NULL;
arg->u.tuple.typentry = typentry;
arg->u.tuple.tupdescseq = typentry ? typentry->tupDescSeqNo - 1 : 0;
arg->u.tuple.tupdescid = INVALID_TUPLEDESC_IDENTIFIER;
arg->u.tuple.atts = NULL;
arg->u.tuple.natts = 0;
/* Mark this invalid till needed, too */
@ -499,7 +499,7 @@ PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt,
/* We'll set up the per-field data later */
arg->u.tuple.recdesc = NULL;
arg->u.tuple.typentry = typentry;
arg->u.tuple.tupdescseq = typentry ? typentry->tupDescSeqNo - 1 : 0;
arg->u.tuple.tupdescid = INVALID_TUPLEDESC_IDENTIFIER;
arg->u.tuple.atts = NULL;
arg->u.tuple.natts = 0;
}
@ -969,11 +969,11 @@ PLyObject_ToComposite(PLyObToDatum *arg, PyObject *plrv,
/* We should have the descriptor of the type's typcache entry */
Assert(desc == arg->u.tuple.typentry->tupDesc);
/* Detect change of descriptor, update cache if needed */
if (arg->u.tuple.tupdescseq != arg->u.tuple.typentry->tupDescSeqNo)
if (arg->u.tuple.tupdescid != arg->u.tuple.typentry->tupDesc_identifier)
{
PLy_output_setup_tuple(arg, desc,
PLy_current_execution_context()->curr_proc);
arg->u.tuple.tupdescseq = arg->u.tuple.typentry->tupDescSeqNo;
arg->u.tuple.tupdescid = arg->u.tuple.typentry->tupDesc_identifier;
}
}
else

View File

@ -42,7 +42,7 @@ typedef struct PLyTupleToOb
TupleDesc recdesc;
/* If we're dealing with a named composite type, these fields are set: */
TypeCacheEntry *typentry; /* typcache entry for type */
int64 tupdescseq; /* last tupdesc seqno seen in typcache */
uint64 tupdescid; /* last tupdesc identifier seen in typcache */
/* These fields are NULL/0 if not yet set: */
PLyDatumToOb *atts; /* array of per-column conversion info */
int natts; /* length of array */
@ -107,7 +107,7 @@ typedef struct PLyObToTuple
TupleDesc recdesc;
/* If we're dealing with a named composite type, these fields are set: */
TypeCacheEntry *typentry; /* typcache entry for type */
int64 tupdescseq; /* last tupdesc seqno seen in typcache */
uint64 tupdescid; /* last tupdesc identifier seen in typcache */
/* These fields are NULL/0 if not yet set: */
PLyObToDatum *atts; /* array of per-column conversion info */
int natts; /* length of array */

View File

@ -4595,23 +4595,32 @@ begin
x int;
y int := i;
r record;
c int8_tbl;
begin
if i = 1 then
x := 42;
r := row(i, i+1);
c := row(i, i+1);
end if;
raise notice 'x = %', x;
raise notice 'y = %', y;
raise notice 'r = %', r;
raise notice 'c = %', c;
end;
end loop;
end$$;
NOTICE: x = 42
NOTICE: y = 1
NOTICE: r = (1,2)
NOTICE: c = (1,2)
NOTICE: x = <NULL>
NOTICE: y = 2
ERROR: record "r" is not assigned yet
NOTICE: r = <NULL>
NOTICE: c = <NULL>
NOTICE: x = <NULL>
NOTICE: y = 3
NOTICE: r = <NULL>
NOTICE: c = <NULL>
\set VERBOSITY default
-- Check handling of conflicts between plpgsql vars and table columns.
set plpgsql.variable_conflict = error;

View File

@ -3745,14 +3745,17 @@ begin
x int;
y int := i;
r record;
c int8_tbl;
begin
if i = 1 then
x := 42;
r := row(i, i+1);
c := row(i, i+1);
end if;
raise notice 'x = %', x;
raise notice 'y = %', y;
raise notice 'r = %', r;
raise notice 'c = %', c;
end;
end loop;
end$$;