diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 6b8ed867d5..a01b46af14 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -46,7 +46,7 @@ static TupleTableSlot *ExecGather(PlanState *pstate); static TupleTableSlot *gather_getnext(GatherState *gatherstate); -static HeapTuple gather_readnext(GatherState *gatherstate); +static MinimalTuple gather_readnext(GatherState *gatherstate); static void ExecShutdownGatherWorkers(GatherState *node); @@ -120,7 +120,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize funnel slot to same tuple descriptor as outer plan. */ gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc, - &TTSOpsHeapTuple); + &TTSOpsMinimalTuple); /* * Gather doesn't support checking a qual (it's always more efficient to @@ -266,7 +266,7 @@ gather_getnext(GatherState *gatherstate) PlanState *outerPlan = outerPlanState(gatherstate); TupleTableSlot *outerTupleSlot; TupleTableSlot *fslot = gatherstate->funnel_slot; - HeapTuple tup; + MinimalTuple tup; while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) { @@ -278,9 +278,9 @@ gather_getnext(GatherState *gatherstate) if (HeapTupleIsValid(tup)) { - ExecStoreHeapTuple(tup, /* tuple to store */ - fslot, /* slot to store the tuple */ - true); /* pfree tuple when done with it */ + ExecStoreMinimalTuple(tup, /* tuple to store */ + fslot, /* slot to store the tuple */ + false); /* don't pfree tuple */ return fslot; } } @@ -308,7 +308,7 @@ gather_getnext(GatherState *gatherstate) /* * Attempt to read a tuple from one of our parallel workers. */ -static HeapTuple +static MinimalTuple gather_readnext(GatherState *gatherstate) { int nvisited = 0; @@ -316,7 +316,7 @@ gather_readnext(GatherState *gatherstate) for (;;) { TupleQueueReader *reader; - HeapTuple tup; + MinimalTuple tup; bool readerdone; /* Check for async events, particularly messages from workers. */ diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 317ddb4ae2..47129344f3 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -45,7 +45,7 @@ */ typedef struct GMReaderTupleBuffer { - HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */ + MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */ int nTuples; /* number of tuples currently stored */ int readCounter; /* index of next tuple to extract */ bool done; /* true if reader is known exhausted */ @@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer static TupleTableSlot *ExecGatherMerge(PlanState *pstate); static int32 heap_compare_slots(Datum a, Datum b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); -static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, - bool nowait, bool *done); +static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, + bool nowait, bool *done); static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); static void gather_merge_setup(GatherMergeState *gm_state); static void gather_merge_init(GatherMergeState *gm_state); @@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state) { /* Allocate the tuple array with length MAX_TUPLE_STORE */ gm_state->gm_tuple_buffers[i].tuple = - (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE); /* Initialize tuple slot for worker */ gm_state->gm_slots[i + 1] = ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc, - &TTSOpsHeapTuple); + &TTSOpsMinimalTuple); } /* Allocate the resources for the merge */ @@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state) GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; while (tuple_buffer->readCounter < tuple_buffer->nTuples) - heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]); + pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]); ExecClearTuple(gm_state->gm_slots[i + 1]); } @@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader) /* Try to fill additional slots in the array. */ for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) { - HeapTuple tuple; + MinimalTuple tuple; tuple = gm_readnext_tuple(gm_state, reader, true, &tuple_buffer->done); - if (!HeapTupleIsValid(tuple)) + if (!tuple) break; tuple_buffer->tuple[i] = tuple; tuple_buffer->nTuples++; @@ -637,7 +637,7 @@ static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) { GMReaderTupleBuffer *tuple_buffer; - HeapTuple tup; + MinimalTuple tup; /* * If we're being asked to generate a tuple from the leader, then we just @@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) reader, nowait, &tuple_buffer->done); - if (!HeapTupleIsValid(tup)) + if (!tup) return false; /* @@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) load_tuple_array(gm_state, reader); } - Assert(HeapTupleIsValid(tup)); + Assert(tup); /* Build the TupleTableSlot for the given tuple */ - ExecStoreHeapTuple(tup, /* tuple to store */ - gm_state->gm_slots[reader], /* slot in which to store - * the tuple */ - true); /* pfree tuple when done with it */ + ExecStoreMinimalTuple(tup, /* tuple to store */ + gm_state->gm_slots[reader], /* slot in which to store + * the tuple */ + true); /* pfree tuple when done with it */ return true; } @@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) /* * Attempt to read a tuple from given worker. */ -static HeapTuple +static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done) { TupleQueueReader *reader; - HeapTuple tup; + MinimalTuple tup; /* Check for async events, particularly messages from workers. */ CHECK_FOR_INTERRUPTS(); @@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, reader = gm_state->reader[nreader - 1]; tup = TupleQueueReaderNext(reader, nowait, done); - return tup; + /* + * Since we'll be buffering these across multiple calls, we need to make a + * copy. + */ + return tup ? heap_copy_minimal_tuple(tup) : NULL; } /* diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index e5656fbfac..30a264ebea 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -54,16 +54,16 @@ static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; - HeapTuple tuple; + MinimalTuple tuple; shm_mq_result result; bool should_free; /* Send the tuple itself. */ - tuple = ExecFetchSlotHeapTuple(slot, true, &should_free); - result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false); + tuple = ExecFetchSlotMinimalTuple(slot, &should_free); + result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false); if (should_free) - heap_freetuple(tuple); + pfree(tuple); /* Check for failure. */ if (result == SHM_MQ_DETACHED) @@ -164,18 +164,18 @@ DestroyTupleQueueReader(TupleQueueReader *reader) * nowait = true and no tuple is ready to return. *done, if not NULL, * is set to true when there are no remaining tuples and otherwise to false. * - * The returned tuple, if any, is allocated in CurrentMemoryContext. - * Note that this routine must not leak memory! (We used to allow that, - * but not any more.) + * The returned tuple, if any, is either in shared memory or a private buffer + * and should not be freed. The pointer is invalid after the next call to + * TupleQueueReaderNext(). * * Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, this can still * accumulate bytes from a partially-read message, so it's useful to call * this with nowait = true even if nothing is returned. */ -HeapTuple +MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) { - HeapTupleData htup; + MinimalTuple tuple; shm_mq_result result; Size nbytes; void *data; @@ -200,13 +200,11 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) Assert(result == SHM_MQ_SUCCESS); /* - * Set up a dummy HeapTupleData pointing to the data from the shm_mq - * (which had better be sufficiently aligned). + * Return a pointer to the queue memory directly (which had better be + * sufficiently aligned). */ - ItemPointerSetInvalid(&htup.t_self); - htup.t_tableOid = InvalidOid; - htup.t_len = nbytes; - htup.t_data = data; + tuple = (MinimalTuple) data; + Assert(tuple->t_len == nbytes); - return heap_copytuple(&htup); + return tuple; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 9941dfe65e..99278eed93 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1730,8 +1730,10 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) List *tlist; /* - * Although the Gather node can project, we prefer to push down such work - * to its child node, so demand an exact tlist from the child. + * Push projection down to the child node. That way, the projection work + * is parallelized, and there can be no system columns in the result (they + * can't travel through a tuple queue because it uses MinimalTuple + * representation). */ subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); @@ -1766,7 +1768,7 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path) List *pathkeys = best_path->path.pathkeys; List *tlist = build_path_tlist(root, &best_path->path); - /* As with Gather, it's best to project away columns in the workers. */ + /* As with Gather, project away columns in the workers. */ subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); /* Create a shell for a GatherMerge plan. */ diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h index 93655ef6bd..264eb56641 100644 --- a/src/include/executor/tqueue.h +++ b/src/include/executor/tqueue.h @@ -26,7 +26,7 @@ extern DestReceiver *CreateTupleQueueDestReceiver(shm_mq_handle *handle); /* Use these to receive tuples from a shm_mq. */ extern TupleQueueReader *CreateTupleQueueReader(shm_mq_handle *handle); extern void DestroyTupleQueueReader(TupleQueueReader *reader); -extern HeapTuple TupleQueueReaderNext(TupleQueueReader *reader, - bool nowait, bool *done); +extern MinimalTuple TupleQueueReaderNext(TupleQueueReader *reader, + bool nowait, bool *done); #endif /* TQUEUE_H */