Add shared tuplestores.

SharedTuplestore allows multiple participants to write into it and
then read the tuples back from it in parallel.  Each reader receives
partial results.

For now it always uses disk files, but other buffering policies and
other kinds of scans (ie each reader receives complete results) may be
useful in future.

The upcoming parallel hash join feature will use this facility.

Author: Thomas Munro
Reviewed-By: Peter Geoghegan, Andres Freund, Robert Haas
Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
This commit is contained in:
Andres Freund 2017-12-18 14:23:19 -08:00
parent 25d532698d
commit ab9e0e718a
6 changed files with 699 additions and 1 deletions

View File

@ -516,6 +516,8 @@ RegisterLWLockTranches(void)
"session_record_table");
LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
"session_typmod_table");
LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE,
"shared_tuplestore");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");

View File

@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS)
OBJS = logtape.o sortsupport.o tuplesort.o tuplestore.o
OBJS = logtape.o sharedtuplestore.o sortsupport.o tuplesort.o tuplestore.o
tuplesort.o: qsort_tuple.c

View File

@ -0,0 +1,633 @@
/*-------------------------------------------------------------------------
*
* sharedtuplestore.c
* Simple mechanism for sharing tuples between backends.
*
* This module contains a shared temporary tuple storage mechanism providing
* a parallel-aware subset of the features of tuplestore.c. Multiple backends
* can write to a SharedTuplestore, and then multiple backends can later scan
* the stored tuples. Currently, the only scan type supported is a parallel
* scan where each backend reads an arbitrary subset of the tuples that were
* written.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/util/sort/sharedtuplestore.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "miscadmin.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
#include "storage/sharedfileset.h"
#include "utils/sharedtuplestore.h"
#include <limits.h>
/*
* The size of chunks, in pages. This is somewhat arbitrarily set to match
* the size of HASH_CHUNK, so that Parallel Hash obtains new chunks of tuples
* at approximately the same rate as it allocates new chunks of memory to
* insert them into.
*/
#define STS_CHUNK_PAGES 4
#define STS_CHUNK_HEADER_SIZE offsetof(SharedTuplestoreChunk, data)
#define STS_CHUNK_DATA_SIZE (STS_CHUNK_PAGES * BLCKSZ - STS_CHUNK_HEADER_SIZE)
/* Chunk written to disk. */
typedef struct SharedTuplestoreChunk
{
int ntuples; /* Number of tuples in this chunk. */
int overflow; /* If overflow, how many including this one? */
char data[FLEXIBLE_ARRAY_MEMBER];
} SharedTuplestoreChunk;
/* Per-participant shared state. */
typedef struct SharedTuplestoreParticipant
{
LWLock lock;
BlockNumber read_page; /* Page number for next read. */
BlockNumber npages; /* Number of pages written. */
bool writing; /* Used only for assertions. */
} SharedTuplestoreParticipant;
/* The control object that lives in shared memory. */
struct SharedTuplestore
{
int nparticipants; /* Number of participants that can write. */
int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */
size_t meta_data_size; /* Size of per-tuple header. */
char name[NAMEDATALEN]; /* A name for this tuplestore. */
/* Followed by per-participant shared state. */
SharedTuplestoreParticipant participants[FLEXIBLE_ARRAY_MEMBER];
};
/* Per-participant state that lives in backend-local memory. */
struct SharedTuplestoreAccessor
{
int participant; /* My participant number. */
SharedTuplestore *sts; /* The shared state. */
SharedFileSet *fileset; /* The SharedFileSet holding files. */
MemoryContext context; /* Memory context for buffers. */
/* State for reading. */
int read_participant; /* The current participant to read from. */
BufFile *read_file; /* The current file to read from. */
int read_ntuples_available; /* The number of tuples in chunk. */
int read_ntuples; /* How many tuples have we read from chunk? */
size_t read_bytes; /* How many bytes have we read from chunk? */
char *read_buffer; /* A buffer for loading tuples. */
size_t read_buffer_size;
BlockNumber read_next_page; /* Lowest block we'll consider reading. */
/* State for writing. */
SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */
BufFile *write_file; /* The current file to write to. */
BlockNumber write_page; /* The next page to write to. */
char *write_pointer; /* Current write pointer within chunk. */
char *write_end; /* One past the end of the current chunk. */
};
static void sts_filename(char *name, SharedTuplestoreAccessor *accessor,
int participant);
/*
* Return the amount of shared memory required to hold SharedTuplestore for a
* given number of participants.
*/
size_t
sts_estimate(int participants)
{
return offsetof(SharedTuplestore, participants) +
sizeof(SharedTuplestoreParticipant) * participants;
}
/*
* Initialize a SharedTuplestore in existing shared memory. There must be
* space for sts_estimate(participants) bytes. If flags includes the value
* SHARED_TUPLESTORE_SINGLE_PASS, the files may in future be removed more
* eagerly (but this isn't yet implemented).
*
* Tuples that are stored may optionally carry a piece of fixed sized
* meta-data which will be retrieved along with the tuple. This is useful for
* the hash values used in multi-batch hash joins, but could have other
* applications.
*
* The caller must supply a SharedFileSet, which is essentially a directory
* that will be cleaned up automatically, and a name which must be unique
* across all SharedTuplestores created in the same SharedFileSet.
*/
SharedTuplestoreAccessor *
sts_initialize(SharedTuplestore *sts, int participants,
int my_participant_number,
size_t meta_data_size,
int flags,
SharedFileSet *fileset,
const char *name)
{
SharedTuplestoreAccessor *accessor;
int i;
Assert(my_participant_number < participants);
sts->nparticipants = participants;
sts->meta_data_size = meta_data_size;
sts->flags = flags;
if (strlen(name) > sizeof(sts->name) - 1)
elog(ERROR, "SharedTuplestore name too long");
strcpy(sts->name, name);
/*
* Limit meta-data so it + tuple size always fits into a single chunk.
* sts_puttuple() and sts_read_tuple() could be made to support scenarios
* where that's not the case, but it's not currently required. If so,
* meta-data size probably should be made variable, too.
*/
if (meta_data_size + sizeof(uint32) >= STS_CHUNK_DATA_SIZE)
elog(ERROR, "meta-data too long");
for (i = 0; i < participants; ++i)
{
LWLockInitialize(&sts->participants[i].lock,
LWTRANCHE_SHARED_TUPLESTORE);
sts->participants[i].read_page = 0;
sts->participants[i].writing = false;
}
accessor = palloc0(sizeof(SharedTuplestoreAccessor));
accessor->participant = my_participant_number;
accessor->sts = sts;
accessor->fileset = fileset;
accessor->context = CurrentMemoryContext;
return accessor;
}
/*
* Attach to a SharedTupleStore that has been initialized by another backend,
* so that this backend can read and write tuples.
*/
SharedTuplestoreAccessor *
sts_attach(SharedTuplestore *sts,
int my_participant_number,
SharedFileSet *fileset)
{
SharedTuplestoreAccessor *accessor;
Assert(my_participant_number < sts->nparticipants);
accessor = palloc0(sizeof(SharedTuplestoreAccessor));
accessor->participant = my_participant_number;
accessor->sts = sts;
accessor->fileset = fileset;
accessor->context = CurrentMemoryContext;
return accessor;
}
static void
sts_flush_chunk(SharedTuplestoreAccessor *accessor)
{
size_t size;
size_t written;
size = STS_CHUNK_PAGES * BLCKSZ;
written = BufFileWrite(accessor->write_file, accessor->write_chunk, size);
if (written != size)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to temporary file: %m")));
memset(accessor->write_chunk, 0, size);
accessor->write_pointer = &accessor->write_chunk->data[0];
accessor->sts->participants[accessor->participant].npages +=
STS_CHUNK_PAGES;
}
/*
* Finish writing tuples. This must be called by all backends that have
* written data before any backend begins reading it.
*/
void
sts_end_write(SharedTuplestoreAccessor *accessor)
{
if (accessor->write_file != NULL)
{
sts_flush_chunk(accessor);
BufFileClose(accessor->write_file);
pfree(accessor->write_chunk);
accessor->write_chunk = NULL;
accessor->write_file = NULL;
accessor->sts->participants[accessor->participant].writing = false;
}
}
/*
* Prepare to rescan. Only one participant must call this. After it returns,
* all participants may call sts_begin_parallel_scan() and then loop over
* sts_parallel_scan_next(). This function must not be called concurrently
* with a scan, and synchronization to avoid that is the caller's
* responsibility.
*/
void
sts_reinitialize(SharedTuplestoreAccessor *accessor)
{
int i;
/*
* Reset the shared read head for all participants' files. Also set the
* initial chunk size to the minimum (any increases from that size will be
* recorded in chunk_expansion_log).
*/
for (i = 0; i < accessor->sts->nparticipants; ++i)
{
accessor->sts->participants[i].read_page = 0;
}
}
/*
* Begin scanning the contents in parallel.
*/
void
sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor)
{
int i PG_USED_FOR_ASSERTS_ONLY;
/* End any existing scan that was in progress. */
sts_end_parallel_scan(accessor);
/*
* Any backend that might have written into this shared tuplestore must
* have called sts_end_write(), so that all buffers are flushed and the
* files have stopped growing.
*/
for (i = 0; i < accessor->sts->nparticipants; ++i)
Assert(!accessor->sts->participants[i].writing);
/*
* We will start out reading the file that THIS backend wrote. There may
* be some caching locality advantage to that.
*/
accessor->read_participant = accessor->participant;
accessor->read_file = NULL;
accessor->read_next_page = 0;
}
/*
* Finish a parallel scan, freeing associated backend-local resources.
*/
void
sts_end_parallel_scan(SharedTuplestoreAccessor *accessor)
{
/*
* Here we could delete all files if SHARED_TUPLESTORE_SINGLE_PASS, but
* we'd probably need a reference count of current parallel scanners so we
* could safely do it only when the reference count reaches zero.
*/
if (accessor->read_file != NULL)
{
BufFileClose(accessor->read_file);
accessor->read_file = NULL;
}
}
/*
* Write a tuple. If a meta-data size was provided to sts_initialize, then a
* pointer to meta data of that size must be provided.
*/
void
sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
MinimalTuple tuple)
{
size_t size;
/* Do we have our own file yet? */
if (accessor->write_file == NULL)
{
SharedTuplestoreParticipant *participant;
char name[MAXPGPATH];
/* Create one. Only this backend will write into it. */
sts_filename(name, accessor, accessor->participant);
accessor->write_file = BufFileCreateShared(accessor->fileset, name);
/* Set up the shared state for this backend's file. */
participant = &accessor->sts->participants[accessor->participant];
participant->writing = true; /* for assertions only */
}
/* Do we have space? */
size = accessor->sts->meta_data_size + tuple->t_len;
if (accessor->write_pointer + size >= accessor->write_end)
{
if (accessor->write_chunk == NULL)
{
/* First time through. Allocate chunk. */
accessor->write_chunk = (SharedTuplestoreChunk *)
MemoryContextAllocZero(accessor->context,
STS_CHUNK_PAGES * BLCKSZ);
accessor->write_chunk->ntuples = 0;
accessor->write_pointer = &accessor->write_chunk->data[0];
accessor->write_end = (char *)
accessor->write_chunk + STS_CHUNK_PAGES * BLCKSZ;
}
else
{
/* See if flushing helps. */
sts_flush_chunk(accessor);
}
/* It may still not be enough in the case of a gigantic tuple. */
if (accessor->write_pointer + size >= accessor->write_end)
{
size_t written;
/*
* We'll write the beginning of the oversized tuple, and then
* write the rest in some number of 'overflow' chunks.
*
* sts_initialize() verifies that the size of the tuple +
* meta-data always fits into a chunk. Because the chunk has been
* flushed above, we can be sure to have all of a chunk's usable
* space available.
*/
Assert(accessor->write_pointer + accessor->sts->meta_data_size +
sizeof(uint32) < accessor->write_end);
/* Write the meta-data as one chunk. */
if (accessor->sts->meta_data_size > 0)
memcpy(accessor->write_pointer, meta_data,
accessor->sts->meta_data_size);
/*
* Write as much of the tuple as we can fit. This includes the
* tuple's size at the start.
*/
written = accessor->write_end - accessor->write_pointer -
accessor->sts->meta_data_size;
memcpy(accessor->write_pointer + accessor->sts->meta_data_size,
tuple, written);
++accessor->write_chunk->ntuples;
size -= accessor->sts->meta_data_size;
size -= written;
/* Now write as many overflow chunks as we need for the rest. */
while (size > 0)
{
size_t written_this_chunk;
sts_flush_chunk(accessor);
/*
* How many oveflow chunks to go? This will allow readers to
* skip all of them at once instead of reading each one.
*/
accessor->write_chunk->overflow = (size + STS_CHUNK_DATA_SIZE - 1) /
STS_CHUNK_DATA_SIZE;
written_this_chunk =
Min(accessor->write_end - accessor->write_pointer, size);
memcpy(accessor->write_pointer, (char *) tuple + written,
written_this_chunk);
accessor->write_pointer += written_this_chunk;
size -= written_this_chunk;
written += written_this_chunk;
}
return;
}
}
/* Copy meta-data and tuple into buffer. */
if (accessor->sts->meta_data_size > 0)
memcpy(accessor->write_pointer, meta_data,
accessor->sts->meta_data_size);
memcpy(accessor->write_pointer + accessor->sts->meta_data_size, tuple,
tuple->t_len);
accessor->write_pointer += size;
++accessor->write_chunk->ntuples;
}
static MinimalTuple
sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data)
{
MinimalTuple tuple;
uint32 size;
size_t remaining_size;
size_t this_chunk_size;
char *destination;
/*
* We'll keep track of bytes read from this chunk so that we can detect an
* overflowing tuple and switch to reading overflow pages.
*/
if (accessor->sts->meta_data_size > 0)
{
if (BufFileRead(accessor->read_file,
meta_data,
accessor->sts->meta_data_size) !=
accessor->sts->meta_data_size)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading meta-data.")));
accessor->read_bytes += accessor->sts->meta_data_size;
}
if (BufFileRead(accessor->read_file,
&size,
sizeof(size)) != sizeof(size))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading size.")));
accessor->read_bytes += sizeof(size);
if (size > accessor->read_buffer_size)
{
size_t new_read_buffer_size;
if (accessor->read_buffer != NULL)
pfree(accessor->read_buffer);
new_read_buffer_size = Max(size, accessor->read_buffer_size * 2);
accessor->read_buffer =
MemoryContextAlloc(accessor->context, new_read_buffer_size);
accessor->read_buffer_size = new_read_buffer_size;
}
remaining_size = size - sizeof(uint32);
this_chunk_size = Min(remaining_size,
BLCKSZ * STS_CHUNK_PAGES - accessor->read_bytes);
destination = accessor->read_buffer + sizeof(uint32);
if (BufFileRead(accessor->read_file,
destination,
this_chunk_size) != this_chunk_size)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading tuple.")));
accessor->read_bytes += this_chunk_size;
remaining_size -= this_chunk_size;
destination += this_chunk_size;
++accessor->read_ntuples;
/* Check if we need to read any overflow chunks. */
while (remaining_size > 0)
{
/* We are now positioned at the start of an overflow chunk. */
SharedTuplestoreChunk chunk_header;
if (BufFileRead(accessor->read_file, &chunk_header, STS_CHUNK_HEADER_SIZE) !=
STS_CHUNK_HEADER_SIZE)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading overflow chunk header.")));
accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
if (chunk_header.overflow == 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("unexpected chunk in shared tuplestore temporary file"),
errdetail_internal("Expected overflow chunk.")));
accessor->read_next_page += STS_CHUNK_PAGES;
this_chunk_size = Min(remaining_size,
BLCKSZ * STS_CHUNK_PAGES -
STS_CHUNK_HEADER_SIZE);
if (BufFileRead(accessor->read_file,
destination,
this_chunk_size) != this_chunk_size)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading tuple.")));
accessor->read_bytes += this_chunk_size;
remaining_size -= this_chunk_size;
destination += this_chunk_size;
/*
* These will be used to count regular tuples following the oversized
* tuple that spilled into this overflow chunk.
*/
accessor->read_ntuples = 0;
accessor->read_ntuples_available = chunk_header.ntuples;
}
tuple = (MinimalTuple) accessor->read_buffer;
tuple->t_len = size;
return tuple;
}
/*
* Get the next tuple in the current parallel scan.
*/
MinimalTuple
sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
{
SharedTuplestoreParticipant *p;
BlockNumber read_page;
bool eof;
for (;;)
{
/* Can we read more tuples from the current chunk? */
if (accessor->read_ntuples < accessor->read_ntuples_available)
return sts_read_tuple(accessor, meta_data);
/* Find the location of a new chunk to read. */
p = &accessor->sts->participants[accessor->read_participant];
LWLockAcquire(&p->lock, LW_EXCLUSIVE);
/* We can skip directly past overflow pages we know about. */
if (p->read_page < accessor->read_next_page)
p->read_page = accessor->read_next_page;
eof = p->read_page >= p->npages;
if (!eof)
{
/* Claim the next chunk. */
read_page = p->read_page;
/* Advance the read head for the next reader. */
p->read_page += STS_CHUNK_PAGES;
accessor->read_next_page = p->read_page;
}
LWLockRelease(&p->lock);
if (!eof)
{
SharedTuplestoreChunk chunk_header;
/* Make sure we have the file open. */
if (accessor->read_file == NULL)
{
char name[MAXPGPATH];
sts_filename(name, accessor, accessor->read_participant);
accessor->read_file =
BufFileOpenShared(accessor->fileset, name);
}
/* Seek and load the chunk header. */
if (BufFileSeekBlock(accessor->read_file, read_page) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Could not seek to next block.")));
if (BufFileRead(accessor->read_file, &chunk_header,
STS_CHUNK_HEADER_SIZE) != STS_CHUNK_HEADER_SIZE)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from shared tuplestore temporary file"),
errdetail_internal("Short read while reading chunk header.")));
/*
* If this is an overflow chunk, we skip it and any following
* overflow chunks all at once.
*/
if (chunk_header.overflow > 0)
{
accessor->read_next_page = read_page +
chunk_header.overflow * STS_CHUNK_PAGES;
continue;
}
accessor->read_ntuples = 0;
accessor->read_ntuples_available = chunk_header.ntuples;
accessor->read_bytes = STS_CHUNK_HEADER_SIZE;
/* Go around again, so we can get a tuple from this chunk. */
}
else
{
if (accessor->read_file != NULL)
{
BufFileClose(accessor->read_file);
accessor->read_file = NULL;
}
/*
* Try the next participant's file. If we've gone full circle,
* we're done.
*/
accessor->read_participant = (accessor->read_participant + 1) %
accessor->sts->nparticipants;
if (accessor->read_participant == accessor->participant)
break;
accessor->read_next_page = 0;
/* Go around again, so we can get a chunk from this file. */
}
}
return NULL;
}
/*
* Create the name used for the BufFile that a given participant will write.
*/
static void
sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant)
{
snprintf(name, MAXPGPATH, "%s.p%d", accessor->sts->name, participant);
}

View File

@ -215,6 +215,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_SESSION_DSA,
LWTRANCHE_SESSION_RECORD_TABLE,
LWTRANCHE_SESSION_TYPMOD_TABLE,
LWTRANCHE_SHARED_TUPLESTORE,
LWTRANCHE_TBM,
LWTRANCHE_PARALLEL_APPEND,
LWTRANCHE_FIRST_USER_DEFINED

View File

@ -0,0 +1,60 @@
/*-------------------------------------------------------------------------
*
* sharedtuplestore.h
* Simple mechinism for sharing tuples between backends.
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/utils/sharedtuplestore.h
*
*-------------------------------------------------------------------------
*/
#ifndef SHAREDTUPLESTORE_H
#define SHAREDTUPLESTORE_H
#include "storage/fd.h"
#include "storage/sharedfileset.h"
struct SharedTuplestore;
typedef struct SharedTuplestore SharedTuplestore;
struct SharedTuplestoreAccessor;
typedef struct SharedTuplestoreAccessor SharedTuplestoreAccessor;
/*
* A flag indicating that the tuplestore will only be scanned once, so backing
* files can be unlinked early.
*/
#define SHARED_TUPLESTORE_SINGLE_PASS 0x01
extern size_t sts_estimate(int participants);
extern SharedTuplestoreAccessor *sts_initialize(SharedTuplestore *sts,
int participants,
int my_participant_number,
size_t meta_data_size,
int flags,
SharedFileSet *fileset,
const char *name);
extern SharedTuplestoreAccessor *sts_attach(SharedTuplestore *sts,
int my_participant_number,
SharedFileSet *fileset);
extern void sts_end_write(SharedTuplestoreAccessor *accessor);
extern void sts_reinitialize(SharedTuplestoreAccessor *accessor);
extern void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor);
extern void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor);
extern void sts_puttuple(SharedTuplestoreAccessor *accessor,
void *meta_data,
MinimalTuple tuple);
extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor,
void *meta_data);
#endif /* SHAREDTUPLESTORE_H */

View File

@ -2038,6 +2038,8 @@ SharedRecordTableEntry
SharedRecordTableKey
SharedRecordTypmodRegistry
SharedSortInfo
SharedTuplestore
SharedTuplestoreAccessor
SharedTypmodTableEntry
ShellTypeInfo
ShippableCacheEntry