diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 5f589614dc..850c67e022 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,11 +36,13 @@ #include "executor/nodeGather.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" +#include "miscadmin.h" #include "utils/memutils.h" #include "utils/rel.h" static TupleTableSlot *gather_getnext(GatherState *gatherstate); +static HeapTuple gather_readnext(GatherState *gatherstate); static void ExecShutdownGatherWorkers(GatherState *node); @@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) TupleTableSlot * ExecGather(GatherState *node) { + TupleTableSlot *fslot = node->funnel_slot; int i; TupleTableSlot *slot; TupleTableSlot *resultSlot; @@ -148,6 +151,7 @@ ExecGather(GatherState *node) */ if (gather->num_workers > 0 && IsInParallelMode()) { + ParallelContext *pcxt; bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ @@ -160,18 +164,26 @@ ExecGather(GatherState *node) * Register backend workers. We might not get as many as we * requested, or indeed any at all. */ - LaunchParallelWorkers(node->pei->pcxt); + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); - /* Set up a tuple queue to collect the results. */ - node->funnel = CreateTupleQueueFunnel(); - for (i = 0; i < node->pei->pcxt->nworkers; ++i) + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers > 0) { - if (node->pei->pcxt->worker[i].bgwhandle) + node->nreaders = 0; + node->reader = + palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + + for (i = 0; i < pcxt->nworkers; ++i) { + if (pcxt->worker[i].bgwhandle == NULL) + continue; + shm_mq_set_handle(node->pei->tqueue[i], - node->pei->pcxt->worker[i].bgwhandle); - RegisterTupleQueueOnFunnel(node->funnel, - node->pei->tqueue[i]); + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders++] = + CreateTupleQueueReader(node->pei->tqueue[i], + fslot->tts_tupleDescriptor); got_any_worker = true; } } @@ -182,7 +194,7 @@ ExecGather(GatherState *node) } /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->funnel == NULL) + node->need_to_scan_locally = (node->reader == NULL) || !gather->single_copy; node->initialized = true; } @@ -254,13 +266,9 @@ ExecEndGather(GatherState *node) } /* - * gather_getnext - * - * Get the next tuple from shared memory queue. This function - * is responsible for fetching tuples from all the queues associated - * with worker backends used in Gather node execution and if there is - * no data available from queues or no worker is available, it does - * fetch the data from local node. + * Read the next tuple. We might fetch a tuple from one of the tuple queues + * using gather_readnext, or if no tuple queue contains a tuple and the + * single_copy flag is not set, we might generate one locally instead. */ static TupleTableSlot * gather_getnext(GatherState *gatherstate) @@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate) TupleTableSlot *fslot = gatherstate->funnel_slot; HeapTuple tup; - while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) + while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { - if (gatherstate->funnel != NULL) + if (gatherstate->reader != NULL) { - bool done = false; - - /* wait only if local scan is done */ - tup = TupleQueueFunnelNext(gatherstate->funnel, - gatherstate->need_to_scan_locally, - &done); - if (done) - ExecShutdownGatherWorkers(gatherstate); + tup = gather_readnext(gatherstate); if (HeapTupleIsValid(tup)) { @@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate) return ExecClearTuple(fslot); } +/* + * Attempt to read a tuple from one of our parallel workers. + */ +static HeapTuple +gather_readnext(GatherState *gatherstate) +{ + int waitpos = gatherstate->nextreader; + + for (;;) + { + TupleQueueReader *reader; + HeapTuple tup; + bool readerdone; + + /* Make sure we've read all messages from workers. */ + HandleParallelMessages(); + + /* Attempt to read a tuple, but don't block if none is available. */ + reader = gatherstate->reader[gatherstate->nextreader]; + tup = TupleQueueReaderNext(reader, true, &readerdone); + + /* + * If this reader is done, remove it. If all readers are done, + * clean up remaining worker state. + */ + if (readerdone) + { + DestroyTupleQueueReader(reader); + --gatherstate->nreaders; + if (gatherstate->nreaders == 0) + { + ExecShutdownGather(gatherstate); + return NULL; + } + else + { + memmove(&gatherstate->reader[gatherstate->nextreader], + &gatherstate->reader[gatherstate->nextreader + 1], + sizeof(TupleQueueReader *) + * (gatherstate->nreaders - gatherstate->nextreader)); + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; + if (gatherstate->nextreader < waitpos) + --waitpos; + } + continue; + } + + /* Advance nextreader pointer in round-robin fashion. */ + gatherstate->nextreader = + (gatherstate->nextreader + 1) % gatherstate->nreaders; + + /* If we got a tuple, return it. */ + if (tup) + return tup; + + /* Have we visited every TupleQueueReader? */ + if (gatherstate->nextreader == waitpos) + { + /* + * If (still) running plan locally, return NULL so caller can + * generate another tuple from the local copy of the plan. + */ + if (gatherstate->need_to_scan_locally) + return NULL; + + /* Nothing to do except wait for developments. */ + WaitLatch(MyLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + } + } +} + /* ---------------------------------------------------------------- * ExecShutdownGatherWorkers * @@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate) void ExecShutdownGatherWorkers(GatherState *node) { - /* Shut down tuple queue funnel before shutting down workers. */ - if (node->funnel != NULL) + /* Shut down tuple queue readers before shutting down workers. */ + if (node->reader != NULL) { - DestroyTupleQueueFunnel(node->funnel); - node->funnel = NULL; + int i; + + for (i = 0; i < node->nreaders; ++i) + DestroyTupleQueueReader(node->reader[i]); + node->reader = NULL; } /* Now shut down the workers. */ diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 67143d33da..f465b1db8b 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -4,10 +4,15 @@ * Use shm_mq to send & receive tuples between parallel backends * * A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver - * under the hood, writes tuples from the executor to a shm_mq. + * under the hood, writes tuples from the executor to a shm_mq. If + * necessary, it also writes control messages describing transient + * record types used within the tuple. * - * A TupleQueueFunnel helps manage the process of reading tuples from - * one or more shm_mq objects being used as tuple queues. + * A TupleQueueReader reads tuples, and if any are sent control messages, + * from a shm_mq and returns the tuples. If transient record types are + * in use, it registers those types based on the received control messages + * and rewrites the typemods sent by the remote side to the corresponding + * local record typemods. * * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,36 +26,403 @@ #include "postgres.h" #include "access/htup_details.h" +#include "catalog/pg_type.h" #include "executor/tqueue.h" +#include "funcapi.h" +#include "lib/stringinfo.h" #include "miscadmin.h" +#include "utils/array.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rangetypes.h" +#include "utils/syscache.h" +#include "utils/typcache.h" + +typedef enum +{ + TQUEUE_REMAP_NONE, /* no special processing required */ + TQUEUE_REMAP_ARRAY, /* array */ + TQUEUE_REMAP_RANGE, /* range */ + TQUEUE_REMAP_RECORD /* composite type, named or anonymous */ +} RemapClass; + +typedef struct +{ + int natts; + RemapClass mapping[FLEXIBLE_ARRAY_MEMBER]; +} RemapInfo; typedef struct { DestReceiver pub; shm_mq_handle *handle; + MemoryContext tmpcontext; + HTAB *recordhtab; + char mode; + TupleDesc tupledesc; + RemapInfo *remapinfo; } TQueueDestReceiver; -struct TupleQueueFunnel +typedef struct RecordTypemodMap { - int nqueues; - int maxqueues; - int nextqueue; - shm_mq_handle **queue; + int remotetypmod; + int localtypmod; +} RecordTypemodMap; + +struct TupleQueueReader +{ + shm_mq_handle *queue; + char mode; + TupleDesc tupledesc; + RemapInfo *remapinfo; + HTAB *typmodmap; }; +#define TUPLE_QUEUE_MODE_CONTROL 'c' +#define TUPLE_QUEUE_MODE_DATA 'd' + +static void tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype, + Datum value); +static void tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value); +static void tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value); +static void tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value); +static void tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod, + TupleDesc tupledesc); +static void TupleQueueHandleControlMessage(TupleQueueReader *reader, + Size nbytes, char *data); +static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, + Size nbytes, HeapTupleHeader data); +static HeapTuple TupleQueueRemapTuple(TupleQueueReader *reader, + TupleDesc tupledesc, RemapInfo * remapinfo, + HeapTuple tuple); +static Datum TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, + Datum value); +static Datum TupleQueueRemapArray(TupleQueueReader *reader, Datum value); +static Datum TupleQueueRemapRange(TupleQueueReader *reader, Datum value); +static Datum TupleQueueRemapRecord(TupleQueueReader *reader, Datum value); +static RemapClass GetRemapClass(Oid typeid); +static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); + /* * Receive a tuple. + * + * This is, at core, pretty simple: just send the tuple to the designated + * shm_mq. The complicated part is that if the tuple contains transient + * record types (see lookup_rowtype_tupdesc), we need to send control + * information to the shm_mq receiver so that those typemods can be correctly + * interpreted, as they are merely held in a backend-local cache. Worse, the + * record type may not at the top level: we could have a range over an array + * type over a range type over a range type over an array type over a record, + * or something like that. */ static void tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + TupleDesc tupledesc = slot->tts_tupleDescriptor; HeapTuple tuple; + HeapTupleHeader tup; + + /* + * Test to see whether the tupledesc has changed; if so, set up for the + * new tupledesc. This is a strange test both because the executor really + * shouldn't change the tupledesc, and also because it would be unsafe if + * the old tupledesc could be freed and a new one allocated at the same + * address. But since some very old code in printtup.c uses this test, we + * adopt it here as well. + */ + if (tqueue->tupledesc != tupledesc || + tqueue->remapinfo->natts != tupledesc->natts) + { + if (tqueue->remapinfo != NULL) + pfree(tqueue->remapinfo); + tqueue->remapinfo = BuildRemapInfo(tupledesc); + } tuple = ExecMaterializeSlot(slot); + tup = tuple->t_data; + + /* + * When, because of the types being transmitted, no record typemod mapping + * can be needed, we can skip a good deal of work. + */ + if (tqueue->remapinfo != NULL) + { + RemapInfo *remapinfo = tqueue->remapinfo; + AttrNumber i; + MemoryContext oldcontext = NULL; + + /* Deform the tuple so we can examine it, if not done already. */ + slot_getallattrs(slot); + + /* Iterate over each attribute and search it for transient typemods. */ + Assert(slot->tts_tupleDescriptor->natts == remapinfo->natts); + for (i = 0; i < remapinfo->natts; ++i) + { + /* Ignore nulls and types that don't need special handling. */ + if (slot->tts_isnull[i] || + remapinfo->mapping[i] == TQUEUE_REMAP_NONE) + continue; + + /* Switch to temporary memory context to avoid leaking. */ + if (oldcontext == NULL) + { + if (tqueue->tmpcontext == NULL) + tqueue->tmpcontext = + AllocSetContextCreate(TopMemoryContext, + "tqueue temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext); + } + + /* Invoke the appropriate walker function. */ + tqueueWalk(tqueue, remapinfo->mapping[i], slot->tts_values[i]); + } + + /* If we used the temp context, reset it and restore prior context. */ + if (oldcontext != NULL) + { + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(tqueue->tmpcontext); + } + + /* If we entered control mode, switch back to data mode. */ + if (tqueue->mode != TUPLE_QUEUE_MODE_DATA) + { + tqueue->mode = TUPLE_QUEUE_MODE_DATA; + shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); + } + } + + /* Send the tuple itself. */ shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); } +/* + * Invoke the appropriate walker function based on the given RemapClass. + */ +static void +tqueueWalk(TQueueDestReceiver * tqueue, RemapClass walktype, Datum value) +{ + check_stack_depth(); + + switch (walktype) + { + case TQUEUE_REMAP_NONE: + break; + case TQUEUE_REMAP_ARRAY: + tqueueWalkArray(tqueue, value); + break; + case TQUEUE_REMAP_RANGE: + tqueueWalkRange(tqueue, value); + break; + case TQUEUE_REMAP_RECORD: + tqueueWalkRecord(tqueue, value); + break; + } +} + +/* + * Walk a record and send control messages for transient record types + * contained therein. + */ +static void +tqueueWalkRecord(TQueueDestReceiver * tqueue, Datum value) +{ + HeapTupleHeader tup; + Oid typeid; + Oid typmod; + TupleDesc tupledesc; + RemapInfo *remapinfo; + + /* Extract typmod from tuple. */ + tup = DatumGetHeapTupleHeader(value); + typeid = HeapTupleHeaderGetTypeId(tup); + typmod = HeapTupleHeaderGetTypMod(tup); + + /* Look up tuple descriptor in typecache. */ + tupledesc = lookup_rowtype_tupdesc(typeid, typmod); + + /* + * If this is a transient record time, send its TupleDesc as a control + * message. (tqueueSendTypemodInfo is smart enough to do this only once + * per typmod.) + */ + if (typeid == RECORDOID) + tqueueSendTypmodInfo(tqueue, typmod, tupledesc); + + /* + * Build the remap information for this tupledesc. We might want to think + * about keeping a cache of this information keyed by typeid and typemod, + * but let's keep it simple for now. + */ + remapinfo = BuildRemapInfo(tupledesc); + + /* + * If remapping is required, deform the tuple and process each field. When + * BuildRemapInfo is null, the data types are such that there can be no + * transient record types here, so we can skip all this work. + */ + if (remapinfo != NULL) + { + Datum *values; + bool *isnull; + HeapTupleData tdata; + AttrNumber i; + + /* Deform the tuple so we can check each column within. */ + values = palloc(tupledesc->natts * sizeof(Datum)); + isnull = palloc(tupledesc->natts * sizeof(bool)); + tdata.t_len = HeapTupleHeaderGetDatumLength(tup); + ItemPointerSetInvalid(&(tdata.t_self)); + tdata.t_tableOid = InvalidOid; + tdata.t_data = tup; + heap_deform_tuple(&tdata, tupledesc, values, isnull); + + /* Recursively check each non-NULL attribute. */ + for (i = 0; i < tupledesc->natts; ++i) + if (!isnull[i]) + tqueueWalk(tqueue, remapinfo->mapping[i], values[i]); + } + + /* Release reference count acquired by lookup_rowtype_tupdesc. */ + DecrTupleDescRefCount(tupledesc); +} + +/* + * Walk a record and send control messages for transient record types + * contained therein. + */ +static void +tqueueWalkArray(TQueueDestReceiver * tqueue, Datum value) +{ + ArrayType *arr = DatumGetArrayTypeP(value); + Oid typeid = ARR_ELEMTYPE(arr); + RemapClass remapclass; + int16 typlen; + bool typbyval; + char typalign; + Datum *elem_values; + bool *elem_nulls; + int num_elems; + int i; + + remapclass = GetRemapClass(typeid); + + /* + * If the elements of the array don't need to be walked, we shouldn't have + * been called in the first place: GetRemapClass should have returned NULL + * when asked about this array type. + */ + Assert(remapclass != TQUEUE_REMAP_NONE); + + /* Deconstruct the array. */ + get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); + deconstruct_array(arr, typeid, typlen, typbyval, typalign, + &elem_values, &elem_nulls, &num_elems); + + /* Walk each element. */ + for (i = 0; i < num_elems; ++i) + if (!elem_nulls[i]) + tqueueWalk(tqueue, remapclass, elem_values[i]); +} + +/* + * Walk a range type and send control messages for transient record types + * contained therein. + */ +static void +tqueueWalkRange(TQueueDestReceiver * tqueue, Datum value) +{ + RangeType *range = DatumGetRangeType(value); + Oid typeid = RangeTypeGetOid(range); + RemapClass remapclass; + TypeCacheEntry *typcache; + RangeBound lower; + RangeBound upper; + bool empty; + + /* + * Extract the lower and upper bounds. It might be worth implementing + * some caching scheme here so that we don't look up the same typeids in + * the type cache repeatedly, but for now let's keep it simple. + */ + typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO); + if (typcache->rngelemtype == NULL) + elog(ERROR, "type %u is not a range type", typeid); + range_deserialize(typcache, range, &lower, &upper, &empty); + + /* Nothing to do for an empty range. */ + if (empty) + return; + + /* + * If the range bounds don't need to be walked, we shouldn't have been + * called in the first place: GetRemapClass should have returned NULL when + * asked about this range type. + */ + remapclass = GetRemapClass(typeid); + Assert(remapclass != TQUEUE_REMAP_NONE); + + /* Walk each bound, if present. */ + if (!upper.infinite) + tqueueWalk(tqueue, remapclass, upper.val); + if (!lower.infinite) + tqueueWalk(tqueue, remapclass, lower.val); +} + +/* + * Send tuple descriptor information for a transient typemod, unless we've + * already done so previously. + */ +static void +tqueueSendTypmodInfo(TQueueDestReceiver * tqueue, int typmod, + TupleDesc tupledesc) +{ + StringInfoData buf; + bool found; + AttrNumber i; + + /* Initialize hash table if not done yet. */ + if (tqueue->recordhtab == NULL) + { + HASHCTL ctl; + + ctl.keysize = sizeof(int); + ctl.entrysize = sizeof(int); + ctl.hcxt = TopMemoryContext; + tqueue->recordhtab = hash_create("tqueue record hashtable", + 100, &ctl, HASH_ELEM | HASH_CONTEXT); + } + + /* Have we already seen this record type? If not, must report it. */ + hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found); + if (found) + return; + + /* If message queue is in data mode, switch to control mode. */ + if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL) + { + tqueue->mode = TUPLE_QUEUE_MODE_CONTROL; + shm_mq_send(tqueue->handle, sizeof(char), &tqueue->mode, false); + } + + /* Assemble a control message. */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int)); + appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int)); + appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, + sizeof(bool)); + for (i = 0; i < tupledesc->natts; ++i) + appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i], + sizeof(FormData_pg_attribute)); + + /* Send control message. */ + shm_mq_send(tqueue->handle, buf.len, buf.data, false); +} + /* * Prepare to receive tuples from executor. */ @@ -77,6 +449,14 @@ tqueueShutdownReceiver(DestReceiver *self) static void tqueueDestroyReceiver(DestReceiver *self) { + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + + if (tqueue->tmpcontext != NULL) + MemoryContextDelete(tqueue->tmpcontext); + if (tqueue->recordhtab != NULL) + hash_destroy(tqueue->recordhtab); + if (tqueue->remapinfo != NULL) + pfree(tqueue->remapinfo); pfree(self); } @@ -96,169 +476,542 @@ CreateTupleQueueDestReceiver(shm_mq_handle *handle) self->pub.rDestroy = tqueueDestroyReceiver; self->pub.mydest = DestTupleQueue; self->handle = handle; + self->tmpcontext = NULL; + self->recordhtab = NULL; + self->mode = TUPLE_QUEUE_MODE_DATA; + self->remapinfo = NULL; return (DestReceiver *) self; } /* - * Create a tuple queue funnel. + * Create a tuple queue reader. */ -TupleQueueFunnel * -CreateTupleQueueFunnel(void) +TupleQueueReader * +CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) { - TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel)); + TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader)); - funnel->maxqueues = 8; - funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *)); + reader->queue = handle; + reader->mode = TUPLE_QUEUE_MODE_DATA; + reader->tupledesc = tupledesc; + reader->remapinfo = BuildRemapInfo(tupledesc); - return funnel; + return reader; } /* - * Destroy a tuple queue funnel. + * Destroy a tuple queue reader. */ void -DestroyTupleQueueFunnel(TupleQueueFunnel *funnel) +DestroyTupleQueueReader(TupleQueueReader *reader) { - int i; - - for (i = 0; i < funnel->nqueues; i++) - shm_mq_detach(shm_mq_get_queue(funnel->queue[i])); - pfree(funnel->queue); - pfree(funnel); + shm_mq_detach(shm_mq_get_queue(reader->queue)); + if (reader->remapinfo != NULL) + pfree(reader->remapinfo); + pfree(reader); } /* - * Remember the shared memory queue handle in funnel. - */ -void -RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle) -{ - if (funnel->nqueues < funnel->maxqueues) - { - funnel->queue[funnel->nqueues++] = handle; - return; - } - - if (funnel->nqueues >= funnel->maxqueues) - { - int newsize = funnel->nqueues * 2; - - Assert(funnel->nqueues == funnel->maxqueues); - - funnel->queue = repalloc(funnel->queue, - newsize * sizeof(shm_mq_handle *)); - funnel->maxqueues = newsize; - } - - funnel->queue[funnel->nqueues++] = handle; -} - -/* - * Fetch a tuple from a tuple queue funnel. + * Fetch a tuple from a tuple queue reader. * - * We try to read from the queues in round-robin fashion so as to avoid - * the situation where some workers get their tuples read expediently while - * others are barely ever serviced. - * - * Even when nowait = false, we read from the individual queues in - * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, - * it can still accumulate bytes from a partially-read message, so doing it - * this way should outperform doing a blocking read on each queue in turn. + * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still + * accumulate bytes from a partially-read message, so it's useful to call + * this with nowait = true even if nothing is returned. * * The return value is NULL if there are no remaining queues or if - * nowait = true and no queue returned a tuple without blocking. *done, if - * not NULL, is set to true when there are no remaining queues and false in - * any other case. + * nowait = true and no tuple is ready to return. *done, if not NULL, + * is set to true when queue is detached and otherwise to false. */ HeapTuple -TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done) +TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) { - int waitpos = funnel->nextqueue; - - /* Corner case: called before adding any queues, or after all are gone. */ - if (funnel->nqueues == 0) - { - if (done != NULL) - *done = true; - return NULL; - } + shm_mq_result result; if (done != NULL) *done = false; for (;;) { - shm_mq_handle *mqh = funnel->queue[funnel->nextqueue]; - shm_mq_result result; Size nbytes; void *data; /* Attempt to read a message. */ - result = shm_mq_receive(mqh, &nbytes, &data, true); + result = shm_mq_receive(reader->queue, &nbytes, &data, true); + + /* If queue is detached, set *done and return NULL. */ + if (result == SHM_MQ_DETACHED) + { + if (done != NULL) + *done = true; + return NULL; + } + + /* In non-blocking mode, bail out if no message ready yet. */ + if (result == SHM_MQ_WOULD_BLOCK) + return NULL; + Assert(result == SHM_MQ_SUCCESS); /* - * Normally, we advance funnel->nextqueue to the next queue at this - * point, but if we're pointing to a queue that we've just discovered - * is detached, then forget that queue and leave the pointer where it - * is until the number of remaining queues fall below that pointer and - * at that point make the pointer point to the first queue. + * OK, we got a message. Process it. + * + * One-byte messages are mode switch messages, so that we can switch + * between "control" and "data" mode. When in "data" mode, each + * message (unless exactly one byte) is a tuple. When in "control" + * mode, each message provides a transient-typmod-to-tupledesc mapping + * so we can interpret future tuples. */ - if (result != SHM_MQ_DETACHED) - funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues; - else + if (nbytes == 1) { - --funnel->nqueues; - if (funnel->nqueues == 0) - { - if (done != NULL) - *done = true; - return NULL; - } + /* Mode switch message. */ + reader->mode = ((char *) data)[0]; + } + else if (reader->mode == TUPLE_QUEUE_MODE_DATA) + { + /* Tuple data. */ + return TupleQueueHandleDataMessage(reader, nbytes, data); + } + else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL) + { + /* Control message, describing a transient record type. */ + TupleQueueHandleControlMessage(reader, nbytes, data); + } + else + elog(ERROR, "invalid mode: %d", (int) reader->mode); + } +} - memmove(&funnel->queue[funnel->nextqueue], - &funnel->queue[funnel->nextqueue + 1], - sizeof(shm_mq_handle *) - * (funnel->nqueues - funnel->nextqueue)); +/* + * Handle a data message - that is, a tuple - from the remote side. + */ +static HeapTuple +TupleQueueHandleDataMessage(TupleQueueReader *reader, + Size nbytes, + HeapTupleHeader data) +{ + HeapTupleData htup; - if (funnel->nextqueue >= funnel->nqueues) - funnel->nextqueue = 0; + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = nbytes; + htup.t_data = data; - if (funnel->nextqueue < waitpos) - --waitpos; + return TupleQueueRemapTuple(reader, reader->tupledesc, reader->remapinfo, + &htup); +} +/* + * Remap tuple typmods per control information received from remote side. + */ +static HeapTuple +TupleQueueRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, + RemapInfo * remapinfo, HeapTuple tuple) +{ + Datum *values; + bool *isnull; + bool dirty = false; + int i; + + /* + * If no remapping is necessary, just copy the tuple into a single + * palloc'd chunk, as caller will expect. + */ + if (remapinfo == NULL) + return heap_copytuple(tuple); + + /* Deform tuple so we can remap record typmods for individual attrs. */ + values = palloc(tupledesc->natts * sizeof(Datum)); + isnull = palloc(tupledesc->natts * sizeof(bool)); + heap_deform_tuple(tuple, tupledesc, values, isnull); + Assert(tupledesc->natts == remapinfo->natts); + + /* Recursively check each non-NULL attribute. */ + for (i = 0; i < tupledesc->natts; ++i) + { + if (isnull[i] || remapinfo->mapping[i] == TQUEUE_REMAP_NONE) + continue; + values[i] = TupleQueueRemap(reader, remapinfo->mapping[i], values[i]); + dirty = true; + } + + /* Reform the modified tuple. */ + return heap_form_tuple(tupledesc, values, isnull); +} + +/* + * Remap a value based on the specified remap class. + */ +static Datum +TupleQueueRemap(TupleQueueReader *reader, RemapClass remapclass, Datum value) +{ + check_stack_depth(); + + switch (remapclass) + { + case TQUEUE_REMAP_NONE: + /* caller probably shouldn't have called us at all, but... */ + return value; + + case TQUEUE_REMAP_ARRAY: + return TupleQueueRemapArray(reader, value); + + case TQUEUE_REMAP_RANGE: + return TupleQueueRemapRange(reader, value); + + case TQUEUE_REMAP_RECORD: + return TupleQueueRemapRecord(reader, value); + } +} + +/* + * Remap an array. + */ +static Datum +TupleQueueRemapArray(TupleQueueReader *reader, Datum value) +{ + ArrayType *arr = DatumGetArrayTypeP(value); + Oid typeid = ARR_ELEMTYPE(arr); + RemapClass remapclass; + int16 typlen; + bool typbyval; + char typalign; + Datum *elem_values; + bool *elem_nulls; + int num_elems; + int i; + + remapclass = GetRemapClass(typeid); + + /* + * If the elements of the array don't need to be walked, we shouldn't have + * been called in the first place: GetRemapClass should have returned NULL + * when asked about this array type. + */ + Assert(remapclass != TQUEUE_REMAP_NONE); + + /* Deconstruct the array. */ + get_typlenbyvalalign(typeid, &typlen, &typbyval, &typalign); + deconstruct_array(arr, typeid, typlen, typbyval, typalign, + &elem_values, &elem_nulls, &num_elems); + + /* Remap each element. */ + for (i = 0; i < num_elems; ++i) + if (!elem_nulls[i]) + elem_values[i] = TupleQueueRemap(reader, remapclass, + elem_values[i]); + + /* Reconstruct and return the array. */ + arr = construct_md_array(elem_values, elem_nulls, + ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr), + typeid, typlen, typbyval, typalign); + return PointerGetDatum(arr); +} + +/* + * Remap a range type. + */ +static Datum +TupleQueueRemapRange(TupleQueueReader *reader, Datum value) +{ + RangeType *range = DatumGetRangeType(value); + Oid typeid = RangeTypeGetOid(range); + RemapClass remapclass; + TypeCacheEntry *typcache; + RangeBound lower; + RangeBound upper; + bool empty; + + /* + * Extract the lower and upper bounds. As in tqueueWalkRange, some + * caching might be a good idea here. + */ + typcache = lookup_type_cache(typeid, TYPECACHE_RANGE_INFO); + if (typcache->rngelemtype == NULL) + elog(ERROR, "type %u is not a range type", typeid); + range_deserialize(typcache, range, &lower, &upper, &empty); + + /* Nothing to do for an empty range. */ + if (empty) + return value; + + /* + * If the range bounds don't need to be walked, we shouldn't have been + * called in the first place: GetRemapClass should have returned NULL when + * asked about this range type. + */ + remapclass = GetRemapClass(typeid); + Assert(remapclass != TQUEUE_REMAP_NONE); + + /* Remap each bound, if present. */ + if (!upper.infinite) + upper.val = TupleQueueRemap(reader, remapclass, upper.val); + if (!lower.infinite) + lower.val = TupleQueueRemap(reader, remapclass, lower.val); + + /* And reserialize. */ + range = range_serialize(typcache, &lower, &upper, empty); + return RangeTypeGetDatum(range); +} + +/* + * Remap a record. + */ +static Datum +TupleQueueRemapRecord(TupleQueueReader *reader, Datum value) +{ + HeapTupleHeader tup; + Oid typeid; + int typmod; + RecordTypemodMap *mapent; + TupleDesc tupledesc; + RemapInfo *remapinfo; + HeapTupleData htup; + HeapTuple atup; + + /* Fetch type OID and typemod. */ + tup = DatumGetHeapTupleHeader(value); + typeid = HeapTupleHeaderGetTypeId(tup); + typmod = HeapTupleHeaderGetTypMod(tup); + + /* If transient record, replace remote typmod with local typmod. */ + if (typeid == RECORDOID) + { + Assert(reader->typmodmap != NULL); + mapent = hash_search(reader->typmodmap, &typmod, + HASH_FIND, NULL); + if (mapent == NULL) + elog(ERROR, "found unrecognized remote typmod %d", typmod); + typmod = mapent->localtypmod; + } + + /* + * Fetch tupledesc and compute remap info. We should probably cache this + * so that we don't have to keep recomputing it. + */ + tupledesc = lookup_rowtype_tupdesc(typeid, typmod); + remapinfo = BuildRemapInfo(tupledesc); + DecrTupleDescRefCount(tupledesc); + + /* Remap tuple. */ + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = HeapTupleHeaderGetDatumLength(tup); + htup.t_data = tup; + atup = TupleQueueRemapTuple(reader, tupledesc, remapinfo, &htup); + HeapTupleHeaderSetTypeId(atup->t_data, typeid); + HeapTupleHeaderSetTypMod(atup->t_data, typmod); + HeapTupleHeaderSetDatumLength(atup->t_data, htup.t_len); + + /* And return the results. */ + return HeapTupleHeaderGetDatum(atup->t_data); +} + +/* + * Handle a control message from the tuple queue reader. + * + * Control messages are sent when the remote side is sending tuples that + * contain transient record types. We need to arrange to bless those + * record types locally and translate between remote and local typmods. + */ +static void +TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, + char *data) +{ + int natts; + int remotetypmod; + bool hasoid; + char *buf = data; + int rc = 0; + int i; + Form_pg_attribute *attrs; + MemoryContext oldcontext; + TupleDesc tupledesc; + RecordTypemodMap *mapent; + bool found; + + /* Extract remote typmod. */ + memcpy(&remotetypmod, &buf[rc], sizeof(int)); + rc += sizeof(int); + + /* Extract attribute count. */ + memcpy(&natts, &buf[rc], sizeof(int)); + rc += sizeof(int); + + /* Extract hasoid flag. */ + memcpy(&hasoid, &buf[rc], sizeof(bool)); + rc += sizeof(bool); + + /* Extract attribute details. */ + oldcontext = MemoryContextSwitchTo(CurTransactionContext); + attrs = palloc(natts * sizeof(Form_pg_attribute)); + for (i = 0; i < natts; ++i) + { + attrs[i] = palloc(sizeof(FormData_pg_attribute)); + memcpy(attrs[i], &buf[rc], sizeof(FormData_pg_attribute)); + rc += sizeof(FormData_pg_attribute); + } + MemoryContextSwitchTo(oldcontext); + + /* We should have read the whole message. */ + Assert(rc == nbytes); + + /* Construct TupleDesc. */ + tupledesc = CreateTupleDesc(natts, hasoid, attrs); + tupledesc = BlessTupleDesc(tupledesc); + + /* Create map if it doesn't exist already. */ + if (reader->typmodmap == NULL) + { + HASHCTL ctl; + + ctl.keysize = sizeof(int); + ctl.entrysize = sizeof(RecordTypemodMap); + ctl.hcxt = CurTransactionContext; + reader->typmodmap = hash_create("typmodmap hashtable", + 100, &ctl, HASH_ELEM | HASH_CONTEXT); + } + + /* Create map entry. */ + mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER, + &found); + if (found) + elog(ERROR, "duplicate message for typmod %d", + remotetypmod); + mapent->localtypmod = tupledesc->tdtypmod; + elog(DEBUG3, "mapping remote typmod %d to local typmod %d", + remotetypmod, tupledesc->tdtypmod); +} + +/* + * Build a mapping indicating what remapping class applies to each attribute + * described by a tupledesc. + */ +static RemapInfo * +BuildRemapInfo(TupleDesc tupledesc) +{ + RemapInfo *remapinfo; + Size size; + AttrNumber i; + bool noop = true; + StringInfoData buf; + + initStringInfo(&buf); + + size = offsetof(RemapInfo, mapping) + + sizeof(RemapClass) * tupledesc->natts; + remapinfo = MemoryContextAllocZero(TopMemoryContext, size); + remapinfo->natts = tupledesc->natts; + for (i = 0; i < tupledesc->natts; ++i) + { + Form_pg_attribute attr = tupledesc->attrs[i]; + + if (attr->attisdropped) + { + remapinfo->mapping[i] = TQUEUE_REMAP_NONE; continue; } - /* If we got a message, return it. */ - if (result == SHM_MQ_SUCCESS) - { - HeapTupleData htup; + remapinfo->mapping[i] = GetRemapClass(attr->atttypid); + if (remapinfo->mapping[i] != TQUEUE_REMAP_NONE) + noop = false; + } - /* - * The tuple data we just read from the queue is only valid until - * we again attempt to read from it. Copy the tuple into a single - * palloc'd chunk as callers will expect. - */ - ItemPointerSetInvalid(&htup.t_self); - htup.t_tableOid = InvalidOid; - htup.t_len = nbytes; - htup.t_data = data; - return heap_copytuple(&htup); + if (noop) + { + appendStringInfo(&buf, "noop"); + pfree(remapinfo); + remapinfo = NULL; + } + + return remapinfo; +} + +/* + * Determine the remap class assocociated with a particular data type. + * + * Transient record types need to have the typmod applied on the sending side + * replaced with a value on the receiving side that has the same meaning. + * + * Arrays, range types, and all record types (including named composite types) + * need to searched for transient record values buried within them. + * Surprisingly, a walker is required even when the indicated type is a + * composite type, because the actual value may be a compatible transient + * record type. + */ +static RemapClass +GetRemapClass(Oid typeid) +{ + RemapClass forceResult = TQUEUE_REMAP_NONE; + RemapClass innerResult = TQUEUE_REMAP_NONE; + + for (;;) + { + HeapTuple tup; + Form_pg_type typ; + + /* Simple cases. */ + if (typeid == RECORDOID) + { + innerResult = TQUEUE_REMAP_RECORD; + break; + } + if (typeid == RECORDARRAYOID) + { + innerResult = TQUEUE_REMAP_ARRAY; + break; + } + + /* Otherwise, we need a syscache lookup to figure it out. */ + tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typeid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for type %u", typeid); + typ = (Form_pg_type) GETSTRUCT(tup); + + /* Look through domains to underlying base type. */ + if (typ->typtype == TYPTYPE_DOMAIN) + { + typeid = typ->typbasetype; + ReleaseSysCache(tup); + continue; } /* - * If we've visited all of the queues, then we should either give up - * and return NULL (if we're in non-blocking mode) or wait for the - * process latch to be set (otherwise). + * Look through arrays to underlying base type, but the final return + * value must be either TQUEUE_REMAP_ARRAY or TQUEUE_REMAP_NONE. (If + * this is an array of integers, for example, we don't need to walk + * it.) */ - if (funnel->nextqueue == waitpos) + if (OidIsValid(typ->typelem) && typ->typlen == -1) { - if (nowait) - return NULL; - WaitLatch(MyLatch, WL_LATCH_SET, 0); - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); + typeid = typ->typelem; + ReleaseSysCache(tup); + if (forceResult == TQUEUE_REMAP_NONE) + forceResult = TQUEUE_REMAP_ARRAY; + continue; } + + /* + * Similarly, look through ranges to the underlying base type, but the + * final return value must be either TQUEUE_REMAP_RANGE or + * TQUEUE_REMAP_NONE. + */ + if (typ->typtype == TYPTYPE_RANGE) + { + ReleaseSysCache(tup); + if (forceResult == TQUEUE_REMAP_NONE) + forceResult = TQUEUE_REMAP_RANGE; + typeid = get_range_subtype(typeid); + continue; + } + + /* Walk composite types. Nothing else needs special handling. */ + if (typ->typtype == TYPTYPE_COMPOSITE) + innerResult = TQUEUE_REMAP_RECORD; + ReleaseSysCache(tup); + break; } + + if (innerResult != TQUEUE_REMAP_NONE && forceResult != TQUEUE_REMAP_NONE) + return forceResult; + return innerResult; } diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h index 6f8eb73c9a..6a668fa8a0 100644 --- a/src/include/executor/tqueue.h +++ b/src/include/executor/tqueue.h @@ -21,11 +21,11 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); /* Use these to receive tuples from a shm_mq. */ -typedef struct TupleQueueFunnel TupleQueueFunnel; -extern TupleQueueFunnel *CreateTupleQueueFunnel(void); -extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel); -extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *); -extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait, - bool *done); +typedef struct TupleQueueReader TupleQueueReader; +extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle, + TupleDesc tupledesc); +extern void DestroyTupleQueueReader(TupleQueueReader *funnel); +extern HeapTuple TupleQueueReaderNext(TupleQueueReader *, + bool nowait, bool *done); #endif /* TQUEUE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 939bc0ed73..58ec889b2f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1963,7 +1963,9 @@ typedef struct GatherState PlanState ps; /* its first field is NodeTag */ bool initialized; struct ParallelExecutorInfo *pei; - struct TupleQueueFunnel *funnel; + int nreaders; + int nextreader; + struct TupleQueueReader **reader; TupleTableSlot *funnel_slot; bool need_to_scan_locally; } GatherState; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index feb821b409..03e1d2c1c7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2018,7 +2018,7 @@ TupleHashEntry TupleHashEntryData TupleHashIterator TupleHashTable -TupleQueueFunnel +TupleQueueReader TupleTableSlot Tuplesortstate Tuplestorestate