postgresql/src/backend/executor/tqueue.c

1031 lines
28 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* tqueue.c
* Use shm_mq to send & receive tuples between parallel backends
*
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
2015-11-06 22:58:45 +01:00
* under the hood, writes tuples from the executor to a shm_mq. If
* necessary, it also writes control messages describing transient
* record types used within the tuple.
*
2015-11-06 22:58:45 +01:00
* A TupleQueueReader reads tuples, and if any are sent control messages,
* from a shm_mq and returns the tuples. If transient record types are
* in use, it registers those types based on the received control messages
* and rewrites the typemods sent by the remote side to the corresponding
* local record typemods.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/executor/tqueue.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
2015-11-06 22:58:45 +01:00
#include "catalog/pg_type.h"
#include "executor/tqueue.h"
2015-11-06 22:58:45 +01:00
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
2015-11-06 22:58:45 +01:00
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rangetypes.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
typedef enum
{
TQUEUE_REMAP_NONE, /* no special processing required */
TQUEUE_REMAP_ARRAY, /* array */
TQUEUE_REMAP_RANGE, /* range */
TQUEUE_REMAP_RECORD /* composite type, named or anonymous */
2016-06-10 00:02:36 +02:00
} RemapClass;
2015-11-06 22:58:45 +01:00
typedef struct
{
int natts;
RemapClass mapping[FLEXIBLE_ARRAY_MEMBER];
2016-06-10 00:02:36 +02:00
} RemapInfo;
typedef struct
{
DestReceiver pub;
shm_mq_handle *handle;
2015-11-06 22:58:45 +01:00
MemoryContext tmpcontext;
HTAB *recordhtab;
char mode;
TupleDesc tupledesc;
RemapInfo *remapinfo;
2016-06-10 00:02:36 +02:00
} TQueueDestReceiver;
2015-11-06 22:58:45 +01:00
typedef struct RecordTypemodMap
{
2015-11-06 22:58:45 +01:00
int remotetypmod;
int localtypmod;
2016-06-10 00:02:36 +02:00
} RecordTypemodMap;
2015-11-06 22:58:45 +01:00
struct TupleQueueReader
{
shm_mq_handle *queue;
char mode;
TupleDesc tupledesc;
RemapInfo *remapinfo;
HTAB *typmodmap;
};
2015-11-06 22:58:45 +01:00
#define TUPLE_QUEUE_MODE_CONTROL 'c'
#define TUPLE_QUEUE_MODE_DATA 'd'
2016-06-10 00:02:36 +02:00
static void tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype,
2015-11-06 22:58:45 +01:00
Datum value);
2016-06-10 00:02:36 +02:00
static void tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value);
static void tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value);
static void tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value);
static void tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
2015-11-06 22:58:45 +01:00
TupleDesc tupledesc);
static void TupleQueueHandleControlMessage(TupleQueueReader *reader,
Size nbytes, char *data);
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader,
Size nbytes, HeapTupleHeader data);
static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader,
2016-06-10 00:02:36 +02:00
TupleDesc tupledesc, RemapInfo *remapinfo,
2015-11-06 22:58:45 +01:00
HeapTuple tuple);
static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass,
Datum value);
static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value);
static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value);
static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value);
static RemapClass GetRemapClass(Oid typeid);
static RemapInfo *BuildRemapInfo(TupleDesc tupledesc);
/*
* Receive a tuple.
2015-11-06 22:58:45 +01:00
*
* This is, at core, pretty simple: just send the tuple to the designated
* shm_mq. The complicated part is that if the tuple contains transient
* record types (see lookup_rowtype_tupdesc), we need to send control
* information to the shm_mq receiver so that those typemods can be correctly
* interpreted, as they are merely held in a backend-local cache. Worse, the
* record type may not at the top level: we could have a range over an array
* type over a range type over a range type over an array type over a record,
* or something like that.
*/
static bool
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
2015-11-06 22:58:45 +01:00
TupleDesc tupledesc = slot->tts_tupleDescriptor;
HeapTuple tuple;
shm_mq_result result;
2015-11-06 22:58:45 +01:00
/*
* Test to see whether the tupledesc has changed; if so, set up for the
* new tupledesc. This is a strange test both because the executor really
* shouldn't change the tupledesc, and also because it would be unsafe if
* the old tupledesc could be freed and a new one allocated at the same
* address. But since some very old code in printtup.c uses a similar
* test, we adopt it here as well.
2015-11-06 22:58:45 +01:00
*/
if (tqueue->tupledesc != tupledesc)
2015-11-06 22:58:45 +01:00
{
if (tqueue->remapinfo != NULL)
pfree(tqueue->remapinfo);
tqueue->remapinfo = BuildRemapInfo(tupledesc);
tqueue->tupledesc = tupledesc;
2015-11-06 22:58:45 +01:00
}
tuple = ExecMaterializeSlot(slot);
2015-11-06 22:58:45 +01:00
/*
* When, because of the types being transmitted, no record typemod mapping
* can be needed, we can skip a good deal of work.
*/
if (tqueue->remapinfo != NULL)
{
RemapInfo *remapinfo = tqueue->remapinfo;
AttrNumber i;
MemoryContext oldcontext = NULL;
/* Deform the tuple so we can examine it, if not done already. */
slot_getallattrs(slot);
/* Iterate over each attribute and search it for transient typemods. */
Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts);
for (i = 0; i < remapinfo->natts; ++i)
{
/* Ignore nulls and types that don't need special handling. */
if (slot->tts_isnull[i] ||
remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
continue;
/* Switch to temporary memory context to avoid leaking. */
if (oldcontext == NULL)
{
if (tqueue->tmpcontext == NULL)
tqueue->tmpcontext =
AllocSetContextCreate(TopMemoryContext,
"tqueue temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
}
/* Invoke the appropriate walker function. */
tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]);
}
/* If we used the temp context, reset it and restore prior context. */
if (oldcontext != NULL)
{
MemoryContextSwitchTo(oldcontext);
MemoryContextReset(tqueue->tmpcontext);
}
/* If we entered control mode, switch back to data mode. */
if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
{
tqueue->mode = TUPLE_QUEUE_MODE_DATA;
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
}
}
/* Send the tuple itself. */
result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
if (result == SHM_MQ_DETACHED)
return false;
else if (result != SHM_MQ_SUCCESS)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2016-07-28 22:18:35 +02:00
errmsg("could not send tuples")));
return true;
}
2015-11-06 22:58:45 +01:00
/*
* Invoke the appropriate walker function based on the given RemapClass.
*/
static void
2016-06-10 00:02:36 +02:00
tqueueWalk(TQueueDestReceiver *tqueue, RemapClass walktype, Datum value)
2015-11-06 22:58:45 +01:00
{
check_stack_depth();
switch (walktype)
{
case TQUEUE_REMAP_NONE:
break;
case TQUEUE_REMAP_ARRAY:
tqueueWalkArray(tqueue, value);
break;
case TQUEUE_REMAP_RANGE:
tqueueWalkRange(tqueue, value);
break;
case TQUEUE_REMAP_RECORD:
tqueueWalkRecord(tqueue, value);
break;
}
}
/*
* Walk a record and send control messages for transient record types
* contained therein.
*/
static void
2016-06-10 00:02:36 +02:00
tqueueWalkRecord(TQueueDestReceiver *tqueue, Datum value)
2015-11-06 22:58:45 +01:00
{
HeapTupleHeader tup;
Oid typeid;
Oid typmod;
TupleDesc tupledesc;
RemapInfo *remapinfo;
/* Extract typmod from tuple. */
tup = DatumGetHeapTupleHeader(value);
typeid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup);
/* Look up tuple descriptor in typecache. */
tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
/*
* If this is a transient record time, send its TupleDesc as a control
* message. (tqueueSendTypemodInfo is smart enough to do this only once
* per typmod.)
*/
if (typeid == RECORDOID)
tqueueSendTypmodInfo(tqueue, typmod, tupledesc);
/*
* Build the remap information for this tupledesc. We might want to think
* about keeping a cache of this information keyed by typeid and typemod,
* but let's keep it simple for now.
*/
remapinfo = BuildRemapInfo(tupledesc);
/*
* If remapping is required, deform the tuple and process each field. When
* BuildRemapInfo is null, the data types are such that there can be no
* transient record types here, so we can skip all this work.
*/
if (remapinfo != NULL)
{
Datum *values;
bool *isnull;
HeapTupleData tdata;
AttrNumber i;
/* Deform the tuple so we can check each column within. */
values = palloc(tupledesc->natts * sizeof(Datum));
isnull = palloc(tupledesc->natts * sizeof(bool));
tdata.t_len = HeapTupleHeaderGetDatumLength(tup);
ItemPointerSetInvalid(&(tdata.t_self));
tdata.t_tableOid = InvalidOid;
tdata.t_data = tup;
heap_deform_tuple(&tdata, tupledesc, values, isnull);
/* Recursively check each non-NULL attribute. */
for (i = 0; i < tupledesc->natts; ++i)
if (!isnull[i])
tqueueWalk(tqueue, remapinfo->mapping[i], values[i]);
}
/* Release reference count acquired by lookup_rowtype_tupdesc. */
DecrTupleDescRefCount(tupledesc);
}
/*
* Walk a record and send control messages for transient record types
* contained therein.
*/
static void
2016-06-10 00:02:36 +02:00
tqueueWalkArray(TQueueDestReceiver *tqueue, Datum value)
2015-11-06 22:58:45 +01:00
{
ArrayType *arr = DatumGetArrayTypeP(value);
Oid typeid = ARR_ELEMTYPE(arr);
RemapClass remapclass;
int16 typlen;
bool typbyval;
char typalign;
Datum *elem_values;
bool *elem_nulls;
int num_elems;
int i;
remapclass = GetRemapClass(typeid);
/*
* If the elements of the array don't need to be walked, we shouldn't have
* been called in the first place: GetRemapClass should have returned NULL
* when asked about this array type.
*/
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Deconstruct the array. */
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
deconstruct_array(arr, typeid, typlen, typbyval, typalign,
&elem_values, &elem_nulls, &num_elems);
/* Walk each element. */
for (i = 0; i < num_elems; ++i)
if (!elem_nulls[i])
tqueueWalk(tqueue, remapclass, elem_values[i]);
}
/*
* Walk a range type and send control messages for transient record types
* contained therein.
*/
static void
2016-06-10 00:02:36 +02:00
tqueueWalkRange(TQueueDestReceiver *tqueue, Datum value)
2015-11-06 22:58:45 +01:00
{
RangeType *range = DatumGetRangeType(value);
Oid typeid = RangeTypeGetOid(range);
RemapClass remapclass;
TypeCacheEntry *typcache;
RangeBound lower;
RangeBound upper;
bool empty;
/*
* Extract the lower and upper bounds. It might be worth implementing
* some caching scheme here so that we don't look up the same typeids in
* the type cache repeatedly, but for now let's keep it simple.
*/
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
if (typcache->rngelemtype == NULL)
elog(ERROR, "type %u is not a range type", typeid);
range_deserialize(typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */
if (empty)
return;
/*
* If the range bounds don't need to be walked, we shouldn't have been
* called in the first place: GetRemapClass should have returned NULL when
* asked about this range type.
*/
remapclass = GetRemapClass(typcache->rngelemtype->type_id);
2015-11-06 22:58:45 +01:00
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Walk each bound, if present. */
if (!upper.infinite)
tqueueWalk(tqueue, remapclass, upper.val);
if (!lower.infinite)
tqueueWalk(tqueue, remapclass, lower.val);
}
/*
* Send tuple descriptor information for a transient typemod, unless we've
* already done so previously.
*/
static void
2016-06-10 00:02:36 +02:00
tqueueSendTypmodInfo(TQueueDestReceiver *tqueue, int typmod,
2015-11-06 22:58:45 +01:00
TupleDesc tupledesc)
{
StringInfoData buf;
bool found;
AttrNumber i;
/* Initialize hash table if not done yet. */
if (tqueue->recordhtab == NULL)
{
HASHCTL ctl;
ctl.keysize = sizeof(int);
ctl.entrysize = sizeof(int);
ctl.hcxt = TopMemoryContext;
tqueue->recordhtab = hash_create("tqueue record hashtable",
100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2015-11-06 22:58:45 +01:00
}
/* Have we already seen this record type? If not, must report it. */
hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
if (found)
return;
/* If message queue is in data mode, switch to control mode. */
if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
{
tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false);
}
/* Assemble a control message. */
initStringInfo(&buf);
appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int));
appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid,
sizeof(bool));
for (i = 0; i < tupledesc->natts; ++i)
appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
sizeof(FormData_pg_attribute));
/* Send control message. */
shm_mq_send(tqueue->handle, buf.len, buf.data, false);
}
/*
* Prepare to receive tuples from executor.
*/
static void
tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* do nothing */
}
/*
* Clean up at end of an executor run
*/
static void
tqueueShutdownReceiver(DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
shm_mq_detach(shm_mq_get_queue(tqueue->handle));
}
/*
* Destroy receiver when done with it
*/
static void
tqueueDestroyReceiver(DestReceiver *self)
{
2015-11-06 22:58:45 +01:00
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
if (tqueue->tmpcontext != NULL)
MemoryContextDelete(tqueue->tmpcontext);
if (tqueue->recordhtab != NULL)
hash_destroy(tqueue->recordhtab);
if (tqueue->remapinfo != NULL)
pfree(tqueue->remapinfo);
pfree(self);
}
/*
* Create a DestReceiver that writes tuples to a tuple queue.
*/
DestReceiver *
CreateTupleQueueDestReceiver(shm_mq_handle *handle)
{
TQueueDestReceiver *self;
self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
self->pub.receiveSlot = tqueueReceiveSlot;
self->pub.rStartup = tqueueStartupReceiver;
self->pub.rShutdown = tqueueShutdownReceiver;
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
self->handle = handle;
2015-11-06 22:58:45 +01:00
self->tmpcontext = NULL;
self->recordhtab = NULL;
self->mode = TUPLE_QUEUE_MODE_DATA;
self->remapinfo = NULL;
return (DestReceiver *) self;
}
/*
2015-11-06 22:58:45 +01:00
* Create a tuple queue reader.
*/
2015-11-06 22:58:45 +01:00
TupleQueueReader *
CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
{
2015-11-06 22:58:45 +01:00
TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
2015-11-06 22:58:45 +01:00
reader->queue = handle;
reader->mode = TUPLE_QUEUE_MODE_DATA;
reader->tupledesc = tupledesc;
reader->remapinfo = BuildRemapInfo(tupledesc);
2015-11-06 22:58:45 +01:00
return reader;
}
/*
2015-11-06 22:58:45 +01:00
* Destroy a tuple queue reader.
*/
void
2015-11-06 22:58:45 +01:00
DestroyTupleQueueReader(TupleQueueReader *reader)
{
2015-11-06 22:58:45 +01:00
shm_mq_detach(shm_mq_get_queue(reader->queue));
if (reader->remapinfo != NULL)
pfree(reader->remapinfo);
pfree(reader);
}
/*
* Fetch a tuple from a tuple queue reader.
*
* The return value is NULL if there are no remaining tuples or if
* nowait = true and no tuple is ready to return. *done, if not NULL,
* is set to true when there are no remaining tuples and otherwise to false.
*
* The returned tuple, if any, is allocated in CurrentMemoryContext.
* That should be a short-lived (tuple-lifespan) context, because we are
* pretty cavalier about leaking memory in that context if we have to do
* tuple remapping.
*
2015-11-06 22:58:45 +01:00
* Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still
* accumulate bytes from a partially-read message, so it's useful to call
* this with nowait = true even if nothing is returned.
*/
HeapTuple
TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
{
shm_mq_result result;
if (done != NULL)
*done = false;
for (;;)
{
Size nbytes;
void *data;
/* Attempt to read a message. */
result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
2015-11-06 22:58:45 +01:00
/* If queue is detached, set *done and return NULL. */
if (result == SHM_MQ_DETACHED)
{
if (done != NULL)
*done = true;
return NULL;
}
/* In non-blocking mode, bail out if no message ready yet. */
if (result == SHM_MQ_WOULD_BLOCK)
return NULL;
Assert(result == SHM_MQ_SUCCESS);
2015-11-06 22:58:45 +01:00
/*
* OK, we got a message. Process it.
*
* One-byte messages are mode switch messages, so that we can switch
* between "control" and "data" mode. Otherwise, when in "data" mode,
* each message is a tuple. When in "control" mode, each message
* provides a transient-typmod-to-tupledesc mapping to let us
* interpret future tuples. Both of those cases certainly require
* more than one byte, so no confusion is possible.
2015-11-06 22:58:45 +01:00
*/
if (nbytes == 1)
{
/* Mode switch message. */
reader->mode = ((char *) data)[0];
}
else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
{
/* Tuple data. */
return TupleQueueHandleDataMessage(reader, nbytes, data);
}
else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
{
/* Control message, describing a transient record type. */
TupleQueueHandleControlMessage(reader, nbytes, data);
}
else
elog(ERROR, "invalid mode: %d", (int) reader->mode);
}
}
/*
2015-11-06 22:58:45 +01:00
* Handle a data message - that is, a tuple - from the remote side.
*/
2015-11-06 22:58:45 +01:00
static HeapTuple
TupleQueueHandleDataMessage(TupleQueueReader *reader,
Size nbytes,
HeapTupleHeader data)
{
2015-11-06 22:58:45 +01:00
HeapTupleData htup;
ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid;
htup.t_len = nbytes;
htup.t_data = data;
return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo,
&htup);
}
/*
* Remap tuple typmods per control information received from remote side.
*/
static HeapTuple
TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc,
2016-06-10 00:02:36 +02:00
RemapInfo *remapinfo, HeapTuple tuple)
2015-11-06 22:58:45 +01:00
{
Datum *values;
bool *isnull;
int i;
/*
* If no remapping is necessary, just copy the tuple into a single
* palloc'd chunk, as caller will expect.
*/
if (remapinfo == NULL)
return heap_copytuple(tuple);
/* Deform tuple so we can remap record typmods for individual attrs. */
values = palloc(tupledesc->natts * sizeof(Datum));
isnull = palloc(tupledesc->natts * sizeof(bool));
heap_deform_tuple(tuple, tupledesc, values, isnull);
Assert(tupledesc->natts == remapinfo->natts);
/* Recursively check each non-NULL attribute. */
for (i = 0; i < tupledesc->natts; ++i)
{
2015-11-06 22:58:45 +01:00
if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE)
continue;
values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]);
}
2015-11-06 22:58:45 +01:00
/* Reform the modified tuple. */
return heap_form_tuple(tupledesc, values, isnull);
}
/*
* Remap a value based on the specified remap class.
*/
static Datum
TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value)
{
check_stack_depth();
switch (remapclass)
{
2015-11-06 22:58:45 +01:00
case TQUEUE_REMAP_NONE:
/* caller probably shouldn't have called us at all, but... */
return value;
case TQUEUE_REMAP_ARRAY:
return TupleQueueRemapArray(reader, value);
2015-11-06 22:58:45 +01:00
case TQUEUE_REMAP_RANGE:
return TupleQueueRemapRange(reader, value);
2015-11-06 22:58:45 +01:00
case TQUEUE_REMAP_RECORD:
return TupleQueueRemapRecord(reader, value);
}
elog(ERROR, "unknown remap class: %d", (int) remapclass);
return (Datum) 0;
2015-11-06 22:58:45 +01:00
}
2015-11-06 22:58:45 +01:00
/*
* Remap an array.
*/
static Datum
TupleQueueRemapArray(TupleQueueReader *reader, Datum value)
{
ArrayType *arr = DatumGetArrayTypeP(value);
Oid typeid = ARR_ELEMTYPE(arr);
RemapClass remapclass;
int16 typlen;
bool typbyval;
char typalign;
Datum *elem_values;
bool *elem_nulls;
int num_elems;
int i;
remapclass = GetRemapClass(typeid);
/*
* If the elements of the array don't need to be walked, we shouldn't have
* been called in the first place: GetRemapClass should have returned NULL
* when asked about this array type.
*/
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Deconstruct the array. */
get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign);
deconstruct_array(arr, typeid, typlen, typbyval, typalign,
&elem_values, &elem_nulls, &num_elems);
/* Remap each element. */
for (i = 0; i < num_elems; ++i)
if (!elem_nulls[i])
elem_values[i] = TupleQueueRemap(reader, remapclass,
elem_values[i]);
/* Reconstruct and return the array. */
arr = construct_md_array(elem_values, elem_nulls,
ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
typeid, typlen, typbyval, typalign);
return PointerGetDatum(arr);
}
/*
2015-11-06 22:58:45 +01:00
* Remap a range type.
*/
2015-11-06 22:58:45 +01:00
static Datum
TupleQueueRemapRange(TupleQueueReader *reader, Datum value)
{
2015-11-06 22:58:45 +01:00
RangeType *range = DatumGetRangeType(value);
Oid typeid = RangeTypeGetOid(range);
RemapClass remapclass;
TypeCacheEntry *typcache;
RangeBound lower;
RangeBound upper;
bool empty;
/*
* Extract the lower and upper bounds. As in tqueueWalkRange, some
* caching might be a good idea here.
*/
typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO);
if (typcache->rngelemtype == NULL)
elog(ERROR, "type %u is not a range type", typeid);
range_deserialize(typcache, range, &lower, &upper, &empty);
/* Nothing to do for an empty range. */
if (empty)
return value;
/*
* If the range bounds don't need to be walked, we shouldn't have been
* called in the first place: GetRemapClass should have returned NULL when
* asked about this range type.
*/
remapclass = GetRemapClass(typcache->rngelemtype->type_id);
2015-11-06 22:58:45 +01:00
Assert(remapclass != TQUEUE_REMAP_NONE);
/* Remap each bound, if present. */
if (!upper.infinite)
upper.val = TupleQueueRemap(reader, remapclass, upper.val);
if (!lower.infinite)
lower.val = TupleQueueRemap(reader, remapclass, lower.val);
/* And reserialize. */
range = range_serialize(typcache, &lower, &upper, empty);
return RangeTypeGetDatum(range);
}
2015-11-06 22:58:45 +01:00
/*
* Remap a record.
*/
static Datum
TupleQueueRemapRecord(TupleQueueReader *reader, Datum value)
{
HeapTupleHeader tup;
Oid typeid;
int typmod;
RecordTypemodMap *mapent;
TupleDesc tupledesc;
RemapInfo *remapinfo;
HeapTupleData htup;
HeapTuple atup;
/* Fetch type OID and typemod. */
tup = DatumGetHeapTupleHeader(value);
typeid = HeapTupleHeaderGetTypeId(tup);
typmod = HeapTupleHeaderGetTypMod(tup);
/* If transient record, replace remote typmod with local typmod. */
if (typeid == RECORDOID)
{
2015-11-06 22:58:45 +01:00
Assert(reader->typmodmap != NULL);
mapent = hash_search(reader->typmodmap, &typmod,
HASH_FIND, NULL);
if (mapent == NULL)
elog(ERROR, "found unrecognized remote typmod %d", typmod);
typmod = mapent->localtypmod;
}
2015-11-06 22:58:45 +01:00
/*
* Fetch tupledesc and compute remap info. We should probably cache this
* so that we don't have to keep recomputing it.
*/
tupledesc = lookup_rowtype_tupdesc(typeid, typmod);
remapinfo = BuildRemapInfo(tupledesc);
DecrTupleDescRefCount(tupledesc);
/* Remap tuple. */
ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid;
htup.t_len = HeapTupleHeaderGetDatumLength(tup);
htup.t_data = tup;
atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup);
HeapTupleHeaderSetTypeId(atup->t_data, typeid);
HeapTupleHeaderSetTypMod(atup->t_data, typmod);
HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len);
/* And return the results. */
return HeapTupleHeaderGetDatum(atup->t_data);
}
2015-11-06 22:58:45 +01:00
/*
* Handle a control message from the tuple queue reader.
*
* Control messages are sent when the remote side is sending tuples that
* contain transient record types. We need to arrange to bless those
* record types locally and translate between remote and local typmods.
*/
static void
TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes,
char *data)
{
int natts;
int remotetypmod;
bool hasoid;
char *buf = data;
int rc = 0;
int i;
Form_pg_attribute *attrs;
MemoryContext oldcontext;
TupleDesc tupledesc;
RecordTypemodMap *mapent;
bool found;
/* Extract remote typmod. */
memcpy(&remotetypmod, &buf[rc], sizeof(int));
rc += sizeof(int);
/* Extract attribute count. */
memcpy(&natts, &buf[rc], sizeof(int));
rc += sizeof(int);
/* Extract hasoid flag. */
memcpy(&hasoid, &buf[rc], sizeof(bool));
rc += sizeof(bool);
/* Extract attribute details. */
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
attrs = palloc(natts * sizeof(Form_pg_attribute));
for (i = 0; i < natts; ++i)
{
2015-11-06 22:58:45 +01:00
attrs[i] = palloc(sizeof(FormData_pg_attribute));
memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute));
rc += sizeof(FormData_pg_attribute);
}
MemoryContextSwitchTo(oldcontext);
2015-11-06 22:58:45 +01:00
/* We should have read the whole message. */
Assert(rc == nbytes);
2015-11-06 22:58:45 +01:00
/* Construct TupleDesc. */
tupledesc = CreateTupleDesc(natts, hasoid, attrs);
tupledesc = BlessTupleDesc(tupledesc);
/* Create map if it doesn't exist already. */
if (reader->typmodmap == NULL)
{
HASHCTL ctl;
ctl.keysize = sizeof(int);
ctl.entrysize = sizeof(RecordTypemodMap);
ctl.hcxt = CurTransactionContext;
reader->typmodmap = hash_create("typmodmap hashtable",
100, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2015-11-06 22:58:45 +01:00
}
/* Create map entry. */
mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
&found);
if (found)
elog(ERROR, "duplicate message for typmod %d",
remotetypmod);
mapent->localtypmod = tupledesc->tdtypmod;
elog(DEBUG3, "mapping remote typmod %d to local typmod %d",
remotetypmod, tupledesc->tdtypmod);
}
/*
* Build a mapping indicating what remapping class applies to each attribute
* described by a tupledesc.
*/
static RemapInfo *
BuildRemapInfo(TupleDesc tupledesc)
{
RemapInfo *remapinfo;
Size size;
AttrNumber i;
bool noop = true;
size = offsetof(RemapInfo, mapping) +
sizeof(RemapClass) * tupledesc->natts;
remapinfo = MemoryContextAllocZero(TopMemoryContext, size);
remapinfo->natts = tupledesc->natts;
for (i = 0; i < tupledesc->natts; ++i)
{
Form_pg_attribute attr = tupledesc->attrs[i];
if (attr->attisdropped)
{
2015-11-06 22:58:45 +01:00
remapinfo->mapping[i] = TQUEUE_REMAP_NONE;
continue;
}
2015-11-06 22:58:45 +01:00
remapinfo->mapping[i] = GetRemapClass(attr->atttypid);
if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE)
noop = false;
}
if (noop)
{
pfree(remapinfo);
remapinfo = NULL;
}
return remapinfo;
}
/*
* Determine the remap class assocociated with a particular data type.
*
* Transient record types need to have the typmod applied on the sending side
* replaced with a value on the receiving side that has the same meaning.
*
* Arrays, range types, and all record types (including named composite types)
* need to searched for transient record values buried within them.
* Surprisingly, a walker is required even when the indicated type is a
* composite type, because the actual value may be a compatible transient
* record type.
*/
static RemapClass
GetRemapClass(Oid typeid)
{
RemapClass forceResult = TQUEUE_REMAP_NONE;
RemapClass innerResult = TQUEUE_REMAP_NONE;
for (;;)
{
HeapTuple tup;
Form_pg_type typ;
2015-11-06 22:58:45 +01:00
/* Simple cases. */
if (typeid == RECORDOID)
{
innerResult = TQUEUE_REMAP_RECORD;
break;
}
if (typeid == RECORDARRAYOID)
{
innerResult = TQUEUE_REMAP_ARRAY;
break;
}
2015-11-06 22:58:45 +01:00
/* Otherwise, we need a syscache lookup to figure it out. */
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", typeid);
typ = (Form_pg_type) GETSTRUCT(tup);
2015-11-06 22:58:45 +01:00
/* Look through domains to underlying base type. */
if (typ->typtype == TYPTYPE_DOMAIN)
{
typeid = typ->typbasetype;
ReleaseSysCache(tup);
continue;
}
2015-11-06 22:58:45 +01:00
/*
* Look through arrays to underlying base type, but the final return
* value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If
* this is an array of integers, for example, we don't need to walk
* it.)
*/
if (OidIsValid(typ->typelem) && typ->typlen == -1)
{
2015-11-06 22:58:45 +01:00
typeid = typ->typelem;
ReleaseSysCache(tup);
if (forceResult == TQUEUE_REMAP_NONE)
forceResult = TQUEUE_REMAP_ARRAY;
continue;
}
/*
2015-11-06 22:58:45 +01:00
* Similarly, look through ranges to the underlying base type, but the
* final return value must be either TQUEUE_REMAP_RANGE or
* TQUEUE_REMAP_NONE.
*/
2015-11-06 22:58:45 +01:00
if (typ->typtype == TYPTYPE_RANGE)
{
2015-11-06 22:58:45 +01:00
ReleaseSysCache(tup);
if (forceResult == TQUEUE_REMAP_NONE)
forceResult = TQUEUE_REMAP_RANGE;
typeid = get_range_subtype(typeid);
continue;
}
2015-11-06 22:58:45 +01:00
/* Walk composite types. Nothing else needs special handling. */
if (typ->typtype == TYPTYPE_COMPOSITE)
innerResult = TQUEUE_REMAP_RECORD;
ReleaseSysCache(tup);
break;
}
2015-11-06 22:58:45 +01:00
if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE)
return forceResult;
return innerResult;
}