diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile index fb27944b89..f130b6e350 100644 --- a/src/backend/access/common/Makefile +++ b/src/backend/access/common/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = bufmask.o heaptuple.o indextuple.o printsimple.o printtup.o \ - reloptions.o scankey.o tupconvert.o tupdesc.o + reloptions.o scankey.o session.o tupconvert.o tupdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/common/session.c b/src/backend/access/common/session.c new file mode 100644 index 0000000000..865999b063 --- /dev/null +++ b/src/backend/access/common/session.c @@ -0,0 +1,208 @@ +/*------------------------------------------------------------------------- + * + * session.c + * Encapsulation of user session. + * + * This is intended to contain data that needs to be shared between backends + * performing work for a client session. In particular such a session is + * shared between the leader and worker processes for parallel queries. At + * some later point it might also become useful infrastructure for separating + * backends from client connections, e.g. for the purpose of pooling. + * + * Currently this infrastructure is used to share: + * - typemod registry for ephemeral row-types, i.e. BlessTupleDesc etc. + * + * Portions Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/backend/access/common/session.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/session.h" +#include "storage/lwlock.h" +#include "storage/shm_toc.h" +#include "utils/memutils.h" +#include "utils/typcache.h" + +/* Magic number for per-session DSM TOC. */ +#define SESSION_MAGIC 0xabb0fbc9 + +/* + * We want to create a DSA area to store shared state that has the same + * lifetime as a session. So far, it's only used to hold the shared record + * type registry. We don't want it to have to create any DSM segments just + * yet in common cases, so we'll give it enough space to hold a very small + * SharedRecordTypmodRegistry. + */ +#define SESSION_DSA_SIZE 0x30000 + +/* + * Magic numbers for state sharing in the per-session DSM area. + */ +#define SESSION_KEY_DSA UINT64CONST(0xFFFFFFFFFFFF0001) +#define SESSION_KEY_RECORD_TYPMOD_REGISTRY UINT64CONST(0xFFFFFFFFFFFF0002) + +/* This backend's current session. */ +Session *CurrentSession = NULL; + +/* + * Set up CurrentSession to point to an empty Session object. + */ +void +InitializeSession(void) +{ + CurrentSession = MemoryContextAllocZero(TopMemoryContext, sizeof(Session)); +} + +/* + * Initialize the per-session DSM segment if it isn't already initialized, and + * return its handle so that worker processes can attach to it. + * + * Unlike the per-context DSM segment, this segement and its contents are + * reused for future parallel queries. + * + * Return DSM_HANDLE_INVALID if a segment can't be allocated due to lack of + * resources. + */ +dsm_handle +GetSessionDsmHandle(void) +{ + shm_toc_estimator estimator; + shm_toc *toc; + dsm_segment *seg; + size_t typmod_registry_size; + size_t size; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + /* + * If we have already created a session-scope DSM segment in this backend, + * return its handle. The same segment will be used for the rest of this + * backend's lifetime. + */ + if (CurrentSession->segment != NULL) + return dsm_segment_handle(CurrentSession->segment); + + /* Otherwise, prepare to set one up. */ + old_context = MemoryContextSwitchTo(TopMemoryContext); + shm_toc_initialize_estimator(&estimator); + + /* Estimate space for the per-session DSA area. */ + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, SESSION_DSA_SIZE); + + /* Estimate space for the per-session record typmod registry. */ + typmod_registry_size = SharedRecordTypmodRegistryEstimate(); + shm_toc_estimate_keys(&estimator, 1); + shm_toc_estimate_chunk(&estimator, typmod_registry_size); + + /* Set up segment and TOC. */ + size = shm_toc_estimate(&estimator); + seg = dsm_create(size, DSM_CREATE_NULL_IF_MAXSEGMENTS); + if (seg == NULL) + { + MemoryContextSwitchTo(old_context); + + return DSM_HANDLE_INVALID; + } + toc = shm_toc_create(SESSION_MAGIC, + dsm_segment_address(seg), + size); + + /* Create per-session DSA area. */ + dsa_space = shm_toc_allocate(toc, SESSION_DSA_SIZE); + dsa = dsa_create_in_place(dsa_space, + SESSION_DSA_SIZE, + LWTRANCHE_SESSION_DSA, + seg); + shm_toc_insert(toc, SESSION_KEY_DSA, dsa_space); + + + /* Create session-scoped shared record typmod registry. */ + typmod_registry_space = shm_toc_allocate(toc, typmod_registry_size); + SharedRecordTypmodRegistryInit((SharedRecordTypmodRegistry *) + typmod_registry_space, seg, dsa); + shm_toc_insert(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, + typmod_registry_space); + + /* + * If we got this far, we can pin the shared memory so it stays mapped for + * the rest of this backend's life. If we don't make it this far, cleanup + * callbacks for anything we installed above (ie currently + * SharedRecordTypemodRegistry) will run when the DSM segment is detached + * by CurrentResourceOwner so we aren't left with a broken CurrentSession. + */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + /* Make segment and area available via CurrentSession. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + MemoryContextSwitchTo(old_context); + + return dsm_segment_handle(seg); +} + +/* + * Attach to a per-session DSM segment provided by a parallel leader. + */ +void +AttachSession(dsm_handle handle) +{ + dsm_segment *seg; + shm_toc *toc; + void *dsa_space; + void *typmod_registry_space; + dsa_area *dsa; + MemoryContext old_context; + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Attach to the DSM segment. */ + seg = dsm_attach(handle); + if (seg == NULL) + elog(ERROR, "could not attach to per-session DSM segment"); + toc = shm_toc_attach(SESSION_MAGIC, dsm_segment_address(seg)); + + /* Attach to the DSA area. */ + dsa_space = shm_toc_lookup(toc, SESSION_KEY_DSA, false); + dsa = dsa_attach_in_place(dsa_space, seg); + + /* Make them available via the current session. */ + CurrentSession->segment = seg; + CurrentSession->area = dsa; + + /* Attach to the shared record typmod registry. */ + typmod_registry_space = + shm_toc_lookup(toc, SESSION_KEY_RECORD_TYPMOD_REGISTRY, false); + SharedRecordTypmodRegistryAttach((SharedRecordTypmodRegistry *) + typmod_registry_space); + + /* Remain attached until end of backend or DetachSession(). */ + dsm_pin_mapping(seg); + dsa_pin_mapping(dsa); + + MemoryContextSwitchTo(old_context); +} + +/* + * Detach from the current session DSM segment. It's not strictly necessary + * to do this explicitly since we'll detach automatically at backend exit, but + * if we ever reuse parallel workers it will become important for workers to + * detach from one session before attaching to another. Note that this runs + * detach hooks. + */ +void +DetachSession(void) +{ + /* Runs detach hooks. */ + dsm_detach(CurrentSession->segment); + CurrentSession->segment = NULL; + dsa_detach(CurrentSession->area); + CurrentSession->area = NULL; +} diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 4436c86361..9e37ca73a8 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -184,6 +184,22 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) return desc; } +/* + * TupleDescCopy + * Copy a tuple descriptor into caller-supplied memory. + * The memory may be shared memory mapped at any address, and must + * be sufficient to hold TupleDescSize(src) bytes. + * + * !!! Constraints and defaults are not copied !!! + */ +void +TupleDescCopy(TupleDesc dst, TupleDesc src) +{ + memcpy(dst, src, TupleDescSize(src)); + dst->constr = NULL; + dst->tdrefcount = -1; +} + /* * TupleDescCopyEntry * This function copies a single attribute structure from one tuple diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ce1b907deb..13c8ba3b19 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -15,6 +15,7 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/session.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/namespace.h" @@ -36,6 +37,7 @@ #include "utils/memutils.h" #include "utils/resowner.h" #include "utils/snapmgr.h" +#include "utils/typcache.h" /* @@ -51,8 +53,9 @@ #define PARALLEL_MAGIC 0x50477c7c /* - * Magic numbers for parallel state sharing. Higher-level code should use - * smaller values, leaving these very large ones for use by this module. + * Magic numbers for per-context parallel state sharing. Higher-level code + * should use smaller values, leaving these very large ones for use by this + * module. */ #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) @@ -63,6 +66,7 @@ #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) +#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -197,6 +201,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size segsize = 0; int i; FixedParallelState *fps; + dsm_handle session_dsm_handle = DSM_HANDLE_INVALID; Snapshot transaction_snapshot = GetTransactionSnapshot(); Snapshot active_snapshot = GetActiveSnapshot(); @@ -211,6 +216,21 @@ InitializeParallelDSM(ParallelContext *pcxt) * Normally, the user will have requested at least one worker process, but * if by chance they have not, we can skip a bunch of things here. */ + if (pcxt->nworkers > 0) + { + /* Get (or create) the per-session DSM segment's handle. */ + session_dsm_handle = GetSessionDsmHandle(); + + /* + * If we weren't able to create a per-session DSM segment, then we can + * continue but we can't safely launch any workers because their + * record typmods would be incompatible so they couldn't exchange + * tuples. + */ + if (session_dsm_handle == DSM_HANDLE_INVALID) + pcxt->nworkers = 0; + } + if (pcxt->nworkers > 0) { /* Estimate space for various kinds of state sharing. */ @@ -226,8 +246,9 @@ InitializeParallelDSM(ParallelContext *pcxt) shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 6); + shm_toc_estimate_keys(&pcxt->estimator, 7); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -295,6 +316,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *asnapspace; char *tstatespace; char *error_queue_space; + char *session_dsm_handle_space; char *entrypointstate; Size lnamelen; @@ -322,6 +344,13 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeSnapshot(active_snapshot, asnapspace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + /* Provide the handle for per-session segment. */ + session_dsm_handle_space = shm_toc_allocate(pcxt->toc, + sizeof(dsm_handle)); + *(dsm_handle *) session_dsm_handle_space = session_dsm_handle; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM, + session_dsm_handle_space); + /* Serialize transaction state. */ tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); SerializeTransactionState(tstatelen, tstatespace); @@ -938,6 +967,7 @@ ParallelWorkerMain(Datum main_arg) char *asnapspace; char *tstatespace; StringInfoData msgbuf; + char *session_dsm_handle_space; /* Set flag to indicate that we're initializing a parallel worker. */ InitializingParallelWorker = true; @@ -1064,6 +1094,11 @@ ParallelWorkerMain(Datum main_arg) combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false); RestoreComboCIDState(combocidspace); + /* Attach to the per-session DSM segment and contained objects. */ + session_dsm_handle_space = + shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false); + AttachSession(*(dsm_handle *) session_dsm_handle_space); + /* Restore transaction snapshot. */ tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, false); RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), @@ -1110,6 +1145,9 @@ ParallelWorkerMain(Datum main_arg) /* Shut down the parallel-worker transaction. */ EndParallelWorkerTransaction(); + /* Detach from the per-session DSM segment. */ + DetachSession(); + /* Report success. */ pq_putmessage('X', NULL, 0); } diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 82a1cf5150..f1060f9675 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 128; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); @@ -510,6 +510,12 @@ RegisterLWLockTranches(void) "predicate_lock_manager"); LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, "parallel_query_dsa"); + LWLockRegisterTranche(LWTRANCHE_SESSION_DSA, + "session_dsa"); + LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE, + "session_record_table"); + LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, + "session_typmod_table"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); /* Register named tranches. */ diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 2e633f08c5..3be853a85a 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -46,6 +46,8 @@ #include "access/heapam.h" #include "access/htup_details.h" #include "access/nbtree.h" +#include "access/parallel.h" +#include "access/session.h" #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_constraint.h" @@ -55,7 +57,9 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "executor/executor.h" +#include "lib/dshash.h" #include "optimizer/planner.h" +#include "storage/lwlock.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" @@ -142,6 +146,117 @@ typedef struct RecordCacheEntry TupleDesc tupdesc; } RecordCacheEntry; +/* + * To deal with non-anonymous record types that are exchanged by backends + * involved in a parallel query, we also need a shared verion of the above. + */ +struct SharedRecordTypmodRegistry +{ + /* A hash table for finding a matching TupleDesc. */ + dshash_table_handle record_table_handle; + /* A hash table for finding a TupleDesc by typmod. */ + dshash_table_handle typmod_table_handle; + /* A source of new record typmod numbers. */ + pg_atomic_uint32 next_typmod; +}; + +/* + * When using shared tuple descriptors as hash table keys we need a way to be + * able to search for an equal shared TupleDesc using a backend-local + * TupleDesc. So we use this type which can hold either, and hash and compare + * functions that know how to handle both. + */ +typedef struct SharedRecordTableKey +{ + union + { + TupleDesc local_tupdesc; + dsa_pointer shared_tupdesc; + }; + bool shared; +} SharedRecordTableKey; + +/* + * The shared version of RecordCacheEntry. This lets us look up a typmod + * using a TupleDesc which may be in local or shared memory. + */ +typedef struct SharedRecordTableEntry +{ + SharedRecordTableKey key; +} SharedRecordTableEntry; + +/* + * An entry in SharedRecordTypmodRegistry's typmod table. This lets us look + * up a TupleDesc in shared memory using a typmod. + */ +typedef struct SharedTypmodTableEntry +{ + uint32 typmod; + dsa_pointer shared_tupdesc; +} SharedTypmodTableEntry; + +/* + * A comparator function for SharedTupleDescTableKey. + */ +static int +shared_record_table_compare(const void *a, const void *b, size_t size, + void *arg) +{ + dsa_area *area = (dsa_area *) arg; + SharedRecordTableKey *k1 = (SharedRecordTableKey *) a; + SharedRecordTableKey *k2 = (SharedRecordTableKey *) b; + TupleDesc t1; + TupleDesc t2; + + if (k1->shared) + t1 = (TupleDesc) dsa_get_address(area, k1->shared_tupdesc); + else + t1 = k1->local_tupdesc; + + if (k2->shared) + t2 = (TupleDesc) dsa_get_address(area, k2->shared_tupdesc); + else + t2 = k2->local_tupdesc; + + return equalTupleDescs(t1, t2) ? 0 : 1; +} + +/* + * A hash function for SharedRecordTableKey. + */ +static uint32 +shared_record_table_hash(const void *a, size_t size, void *arg) +{ + dsa_area *area = (dsa_area *) arg; + SharedRecordTableKey *k = (SharedRecordTableKey *) a; + TupleDesc t; + + if (k->shared) + t = (TupleDesc) dsa_get_address(area, k->shared_tupdesc); + else + t = k->local_tupdesc; + + return hashTupleDesc(t); +} + +/* Parameters for SharedRecordTypmodRegistry's TupleDesc table. */ +static const dshash_parameters srtr_record_table_params = { + sizeof(SharedRecordTableKey), /* unused */ + sizeof(SharedRecordTableEntry), + shared_record_table_compare, + shared_record_table_hash, + LWTRANCHE_SESSION_RECORD_TABLE +}; + +/* Parameters for SharedRecordTypmodRegistry's typmod hash table. */ +static const dshash_parameters srtr_typmod_table_params = { + sizeof(uint32), + sizeof(SharedTypmodTableEntry), + dshash_memcmp, + dshash_memhash, + LWTRANCHE_SESSION_TYPMOD_TABLE +}; + static HTAB *RecordCacheHash = NULL; static TupleDesc *RecordCacheArray = NULL; @@ -168,6 +283,13 @@ static void TypeCacheConstrCallback(Datum arg, int cacheid, uint32 hashvalue); static void load_enum_cache_data(TypeCacheEntry *tcache); static EnumItem *find_enumitem(TypeCacheEnumData *enumdata, Oid arg); static int enum_oid_cmp(const void *left, const void *right); +static void shared_record_typmod_registry_detach(dsm_segment *segment, + Datum datum); +static void shared_record_typmod_registry_worker_detach(dsm_segment *segment, + Datum datum); +static TupleDesc find_or_make_matching_shared_tupledesc(TupleDesc tupdesc); +static dsa_pointer share_tupledesc(dsa_area *area, TupleDesc tupdesc, + uint32 typmod); /* @@ -377,8 +499,8 @@ lookup_type_cache(Oid type_id, int flags) /* * Reset info about hash functions whenever we pick up new info about - * equality operator. This is so we can ensure that the hash functions - * match the operator. + * equality operator. This is so we can ensure that the hash + * functions match the operator. */ typentry->flags &= ~(TCFLAGS_CHECKED_HASH_PROC); typentry->flags &= ~(TCFLAGS_CHECKED_HASH_EXTENDED_PROC); @@ -1243,6 +1365,33 @@ cache_record_field_properties(TypeCacheEntry *typentry) typentry->flags |= TCFLAGS_CHECKED_FIELD_PROPERTIES; } +/* + * Make sure that RecordCacheArray is large enough to store 'typmod'. + */ +static void +ensure_record_cache_typmod_slot_exists(int32 typmod) +{ + if (RecordCacheArray == NULL) + { + RecordCacheArray = (TupleDesc *) + MemoryContextAllocZero(CacheMemoryContext, 64 * sizeof(TupleDesc)); + RecordCacheArrayLen = 64; + } + + if (typmod >= RecordCacheArrayLen) + { + int32 newlen = RecordCacheArrayLen * 2; + + while (typmod >= newlen) + newlen *= 2; + + RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, + newlen * sizeof(TupleDesc)); + memset(RecordCacheArray + RecordCacheArrayLen, 0, + (newlen - RecordCacheArrayLen) * sizeof(TupleDesc *)); + RecordCacheArrayLen = newlen; + } +} /* * lookup_rowtype_tupdesc_internal --- internal routine to lookup a rowtype @@ -1273,15 +1422,53 @@ lookup_rowtype_tupdesc_internal(Oid type_id, int32 typmod, bool noError) /* * It's a transient record type, so look in our record-type table. */ - if (typmod < 0 || typmod >= NextRecordTypmod) + if (typmod >= 0) { - if (!noError) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("record type has not been registered"))); - return NULL; + /* It is already in our local cache? */ + if (typmod < RecordCacheArrayLen && + RecordCacheArray[typmod] != NULL) + return RecordCacheArray[typmod]; + + /* Are we attached to a shared record typmod registry? */ + if (CurrentSession->shared_typmod_registry != NULL) + { + SharedTypmodTableEntry *entry; + + /* Try to find it in the shared typmod index. */ + entry = dshash_find(CurrentSession->shared_typmod_table, + &typmod, false); + if (entry != NULL) + { + TupleDesc tupdesc; + + tupdesc = (TupleDesc) + dsa_get_address(CurrentSession->area, + entry->shared_tupdesc); + Assert(typmod == tupdesc->tdtypmod); + + /* We may need to extend the local RecordCacheArray. */ + ensure_record_cache_typmod_slot_exists(typmod); + + /* + * Our local array can now point directly to the TupleDesc + * in shared memory. + */ + RecordCacheArray[typmod] = tupdesc; + Assert(tupdesc->tdrefcount == -1); + + dshash_release_lock(CurrentSession->shared_typmod_table, + entry); + + return RecordCacheArray[typmod]; + } + } } - return RecordCacheArray[typmod]; + + if (!noError) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("record type has not been registered"))); + return NULL; } } @@ -1303,7 +1490,7 @@ lookup_rowtype_tupdesc(Oid type_id, int32 typmod) TupleDesc tupDesc; tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, false); - IncrTupleDescRefCount(tupDesc); + PinTupleDesc(tupDesc); return tupDesc; } @@ -1321,7 +1508,7 @@ lookup_rowtype_tupdesc_noerror(Oid type_id, int32 typmod, bool noError) tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, noError); if (tupDesc != NULL) - IncrTupleDescRefCount(tupDesc); + PinTupleDesc(tupDesc); return tupDesc; } @@ -1376,7 +1563,6 @@ assign_record_type_typmod(TupleDesc tupDesc) RecordCacheEntry *recentry; TupleDesc entDesc; bool found; - int32 newtypmod; MemoryContext oldcxt; Assert(tupDesc->tdtypeid == RECORDOID); @@ -1414,36 +1600,210 @@ assign_record_type_typmod(TupleDesc tupDesc) recentry->tupdesc = NULL; oldcxt = MemoryContextSwitchTo(CacheMemoryContext); - if (RecordCacheArray == NULL) + /* Look in the SharedRecordTypmodRegistry, if attached */ + entDesc = find_or_make_matching_shared_tupledesc(tupDesc); + if (entDesc == NULL) { - RecordCacheArray = (TupleDesc *) palloc(64 * sizeof(TupleDesc)); - RecordCacheArrayLen = 64; + /* Reference-counted local cache only. */ + entDesc = CreateTupleDescCopy(tupDesc); + entDesc->tdrefcount = 1; + entDesc->tdtypmod = NextRecordTypmod++; } - else if (NextRecordTypmod >= RecordCacheArrayLen) - { - int32 newlen = RecordCacheArrayLen * 2; - - RecordCacheArray = (TupleDesc *) repalloc(RecordCacheArray, - newlen * sizeof(TupleDesc)); - RecordCacheArrayLen = newlen; - } - - /* if fail in subrs, no damage except possibly some wasted memory... */ - entDesc = CreateTupleDescCopy(tupDesc); + ensure_record_cache_typmod_slot_exists(entDesc->tdtypmod); + RecordCacheArray[entDesc->tdtypmod] = entDesc; recentry->tupdesc = entDesc; - /* mark it as a reference-counted tupdesc */ - entDesc->tdrefcount = 1; - /* now it's safe to advance NextRecordTypmod */ - newtypmod = NextRecordTypmod++; - entDesc->tdtypmod = newtypmod; - RecordCacheArray[newtypmod] = entDesc; - /* report to caller as well */ - tupDesc->tdtypmod = newtypmod; + /* Update the caller's tuple descriptor. */ + tupDesc->tdtypmod = entDesc->tdtypmod; MemoryContextSwitchTo(oldcxt); } +/* + * Return the amout of shmem required to hold a SharedRecordTypmodRegistry. + * This exists only to avoid exposing private innards of + * SharedRecordTypmodRegistry in a header. + */ +size_t +SharedRecordTypmodRegistryEstimate(void) +{ + return sizeof(SharedRecordTypmodRegistry); +} + +/* + * Initialize 'registry' in a pre-existing shared memory region, which must be + * maximally aligned and have space for SharedRecordTypmodRegistryEstimate() + * bytes. + * + * 'area' will be used to allocate shared memory space as required for the + * typemod registration. The current process, expected to be a leader process + * in a parallel query, will be attached automatically and its current record + * types will be loaded into *registry. While attached, all calls to + * assign_record_type_typmod will use the shared registry. Worker backends + * will need to attach explicitly. + * + * Note that this function takes 'area' and 'segment' as arguments rather than + * accessing them via CurrentSession, because they aren't installed there + * until after this function runs. + */ +void +SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *registry, + dsm_segment *segment, + dsa_area *area) +{ + MemoryContext old_context; + dshash_table *record_table; + dshash_table *typmod_table; + int32 typmod; + + Assert(!IsParallelWorker()); + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSession->shared_typmod_registry == NULL); + Assert(CurrentSession->shared_record_table == NULL); + Assert(CurrentSession->shared_typmod_table == NULL); + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Create the hash table of tuple descriptors indexed by themselves. */ + record_table = dshash_create(area, &srtr_record_table_params, area); + + /* Create the hash table of tuple descriptors indexed by typmod. */ + typmod_table = dshash_create(area, &srtr_typmod_table_params, NULL); + + MemoryContextSwitchTo(old_context); + + /* Initialize the SharedRecordTypmodRegistry. */ + registry->record_table_handle = dshash_get_hash_table_handle(record_table); + registry->typmod_table_handle = dshash_get_hash_table_handle(typmod_table); + pg_atomic_init_u32(®istry->next_typmod, NextRecordTypmod); + + /* + * Copy all entries from this backend's private registry into the shared + * registry. + */ + for (typmod = 0; typmod < NextRecordTypmod; ++typmod) + { + SharedTypmodTableEntry *typmod_table_entry; + SharedRecordTableEntry *record_table_entry; + SharedRecordTableKey record_table_key; + dsa_pointer shared_dp; + TupleDesc tupdesc; + bool found; + + tupdesc = RecordCacheArray[typmod]; + if (tupdesc == NULL) + continue; + + /* Copy the TupleDesc into shared memory. */ + shared_dp = share_tupledesc(area, tupdesc, typmod); + + /* Insert into the typmod table. */ + typmod_table_entry = dshash_find_or_insert(typmod_table, + &tupdesc->tdtypmod, + &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + typmod_table_entry->typmod = tupdesc->tdtypmod; + typmod_table_entry->shared_tupdesc = shared_dp; + dshash_release_lock(typmod_table, typmod_table_entry); + + /* Insert into the record table. */ + record_table_key.shared = false; + record_table_key.local_tupdesc = tupdesc; + record_table_entry = dshash_find_or_insert(record_table, + &record_table_key, + &found); + if (!found) + { + record_table_entry->key.shared = true; + record_table_entry->key.shared_tupdesc = shared_dp; + } + dshash_release_lock(record_table, record_table_entry); + } + + /* + * Set up the global state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSession->shared_record_table = record_table; + CurrentSession->shared_typmod_table = typmod_table; + CurrentSession->shared_typmod_registry = registry; + + /* + * We install a detach hook in the leader, but only to handle cleanup on + * failure during GetSessionDsmHandle(). Once GetSessionDsmHandle() pins + * the memory, the leader process will use a shared registry until it + * exits. + */ + on_dsm_detach(segment, shared_record_typmod_registry_detach, (Datum) 0); +} + +/* + * Attach to 'registry', which must have been initialized already by another + * backend. Future calls to assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal will use the shared registry until the + * current session is detached. + */ +void +SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *registry) +{ + MemoryContext old_context; + dshash_table *record_table; + dshash_table *typmod_table; + + Assert(IsParallelWorker()); + + /* We can't already be attached to a shared registry. */ + Assert(CurrentSession != NULL); + Assert(CurrentSession->segment != NULL); + Assert(CurrentSession->area != NULL); + Assert(CurrentSession->shared_typmod_registry == NULL); + Assert(CurrentSession->shared_record_table == NULL); + Assert(CurrentSession->shared_typmod_table == NULL); + + /* + * We can't already have typmods in our local cache, because they'd clash + * with those imported by SharedRecordTypmodRegistryInit. This should be + * a freshly started parallel worker. If we ever support worker + * recycling, a worker would need to zap its local cache in between + * servicing different queries, in order to be able to call this and + * synchronize typmods with a new leader; see + * shared_record_typmod_registry_detach(). + */ + Assert(NextRecordTypmod == 0); + + old_context = MemoryContextSwitchTo(TopMemoryContext); + + /* Attach to the two hash tables. */ + record_table = dshash_attach(CurrentSession->area, + &srtr_record_table_params, + registry->record_table_handle, + CurrentSession->area); + typmod_table = dshash_attach(CurrentSession->area, + &srtr_typmod_table_params, + registry->typmod_table_handle, + NULL); + + MemoryContextSwitchTo(old_context); + + /* + * We install a different detach callback that performs a more complete + * reset of backend local state. + */ + on_dsm_detach(CurrentSession->segment, + shared_record_typmod_registry_worker_detach, + PointerGetDatum(registry)); + + /* + * Set up the session state that will tell assign_record_type_typmod and + * lookup_rowtype_tupdesc_internal about the shared registry. + */ + CurrentSession->shared_typmod_registry = registry; + CurrentSession->shared_record_table = record_table; + CurrentSession->shared_typmod_table = typmod_table; +} + /* * TypeCacheRelCallback * Relcache inval callback function @@ -1858,3 +2218,213 @@ enum_oid_cmp(const void *left, const void *right) else return 0; } + +/* + * Copy 'tupdesc' into newly allocated shared memory in 'area', set its typmod + * to the given value and return a dsa_pointer. + */ +static dsa_pointer +share_tupledesc(dsa_area *area, TupleDesc tupdesc, uint32 typmod) +{ + dsa_pointer shared_dp; + TupleDesc shared; + + shared_dp = dsa_allocate(area, TupleDescSize(tupdesc)); + shared = (TupleDesc) dsa_get_address(area, shared_dp); + TupleDescCopy(shared, tupdesc); + shared->tdtypmod = typmod; + + return shared_dp; +} + +/* + * If we are attached to a SharedRecordTypmodRegistry, use it to find or + * create a shared TupleDesc that matches 'tupdesc'. Otherwise return NULL. + * Tuple descriptors returned by this function are not reference counted, and + * will exist at least as long as the current backend remained attached to the + * current session. + */ +static TupleDesc +find_or_make_matching_shared_tupledesc(TupleDesc tupdesc) +{ + TupleDesc result; + SharedRecordTableKey key; + SharedRecordTableEntry *record_table_entry; + SharedTypmodTableEntry *typmod_table_entry; + dsa_pointer shared_dp; + bool found; + uint32 typmod; + + /* If not even attached, nothing to do. */ + if (CurrentSession->shared_typmod_registry == NULL) + return NULL; + + /* Try to find a matching tuple descriptor in the record table. */ + key.shared = false; + key.local_tupdesc = tupdesc; + record_table_entry = (SharedRecordTableEntry *) + dshash_find(CurrentSession->shared_record_table, &key, false); + if (record_table_entry) + { + Assert(record_table_entry->key.shared); + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, + record_table_entry->key.shared_tupdesc); + Assert(result->tdrefcount == -1); + + return result; + } + + /* Allocate a new typmod number. This will be wasted if we error out. */ + typmod = (int) + pg_atomic_fetch_add_u32(&CurrentSession->shared_typmod_registry->next_typmod, + 1); + + /* Copy the TupleDesc into shared memory. */ + shared_dp = share_tupledesc(CurrentSession->area, tupdesc, typmod); + + /* + * Create an entry in the typmod table so that others will understand this + * typmod number. + */ + PG_TRY(); + { + typmod_table_entry = (SharedTypmodTableEntry *) + dshash_find_or_insert(CurrentSession->shared_typmod_table, + &typmod, &found); + if (found) + elog(ERROR, "cannot create duplicate shared record typmod"); + } + PG_CATCH(); + { + dsa_free(CurrentSession->area, shared_dp); + PG_RE_THROW(); + } + PG_END_TRY(); + typmod_table_entry->typmod = typmod; + typmod_table_entry->shared_tupdesc = shared_dp; + dshash_release_lock(CurrentSession->shared_typmod_table, + typmod_table_entry); + + /* + * Finally create an entry in the record table so others with matching + * tuple descriptors can reuse the typmod. + */ + record_table_entry = (SharedRecordTableEntry *) + dshash_find_or_insert(CurrentSession->shared_record_table, &key, + &found); + if (found) + { + /* + * Someone concurrently inserted a matching tuple descriptor since the + * first time we checked. Use that one instead. + */ + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + + /* Might as well free up the space used by the one we created. */ + found = dshash_delete_key(CurrentSession->shared_typmod_table, + &typmod); + Assert(found); + dsa_free(CurrentSession->area, shared_dp); + + /* Return the one we found. */ + Assert(record_table_entry->key.shared); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, + record_table_entry->key.shared); + Assert(result->tdrefcount == -1); + + return result; + } + + /* Store it and return it. */ + record_table_entry->key.shared = true; + record_table_entry->key.shared_tupdesc = shared_dp; + dshash_release_lock(CurrentSession->shared_record_table, + record_table_entry); + result = (TupleDesc) + dsa_get_address(CurrentSession->area, shared_dp); + Assert(result->tdrefcount == -1); + + return result; +} + +/* + * Detach hook to forget about the current shared record typmod + * infrastructure. This is registered directly in leader backends, and + * reached only in case of error or shutdown. It's also reached indirectly + * via the worker detach callback below. + */ +static void +shared_record_typmod_registry_detach(dsm_segment *segment, Datum datum) +{ + /* Be cautious here: maybe we didn't finish initializing. */ + if (CurrentSession->shared_record_table != NULL) + { + dshash_detach(CurrentSession->shared_record_table); + CurrentSession->shared_record_table = NULL; + } + if (CurrentSession->shared_typmod_table != NULL) + { + dshash_detach(CurrentSession->shared_typmod_table); + CurrentSession->shared_typmod_table = NULL; + } + CurrentSession->shared_typmod_registry = NULL; +} + +/* + * Deatch hook allowing workers to disconnect from shared record typmod + * registry. The resulting state should allow a worker to attach to a + * different leader, if worker reuse pools are invented. + */ +static void +shared_record_typmod_registry_worker_detach(dsm_segment *segment, Datum datum) +{ + /* + * Forget everything we learned about record typmods as part of the + * session we are disconnecting from, and return to the initial state. + */ + if (RecordCacheArray != NULL) + { + int32 i; + + for (i = 0; i < RecordCacheArrayLen; ++i) + { + if (RecordCacheArray[i] != NULL) + { + TupleDesc tupdesc = RecordCacheArray[i]; + + /* + * Pointers to tuple descriptors in shared memory are not + * reference counted, so we are not responsible for freeing + * them. They'll survive as long as the shared session + * exists, which should be as long as the owning leader + * backend exists. In theory we do need to free local + * reference counted tuple descriptors however, and we can't + * do that with DescTupleDescRefCount() because we aren't + * using a resource owner. In practice we don't expect to + * find any non-shared TupleDesc object in a worker. + */ + if (tupdesc->tdrefcount != -1) + { + Assert(tupdesc->tdrefcount > 0); + if (--tupdesc->tdrefcount == 0) + FreeTupleDesc(tupdesc); + } + } + } + pfree(RecordCacheArray); + RecordCacheArray = NULL; + } + if (RecordCacheHash != NULL) + { + hash_destroy(RecordCacheHash); + RecordCacheHash = NULL; + } + NextRecordTypmod = 0; + /* Call the code common to leader and worker detach. */ + shared_record_typmod_registry_detach(segment, datum); +} diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index eb6960d93f..20f1d279e9 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -21,6 +21,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/session.h" #include "access/sysattr.h" #include "access/xact.h" #include "access/xlog.h" @@ -1027,6 +1028,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, /* initialize client encoding */ InitializeClientEncoding(); + /* Initialize this backend's session state. */ + InitializeSession(); + /* report this backend in the PgBackendStatus array */ if (!bootstrap) pgstat_bestart(); diff --git a/src/include/access/session.h b/src/include/access/session.h new file mode 100644 index 0000000000..8376dc5312 --- /dev/null +++ b/src/include/access/session.h @@ -0,0 +1,44 @@ +/*------------------------------------------------------------------------- + * + * session.h + * Encapsulation of user session. + * + * Copyright (c) 2017, PostgreSQL Global Development Group + * + * src/include/access/session.h + * + *------------------------------------------------------------------------- + */ +#ifndef SESSION_H +#define SESSION_H + +#include "lib/dshash.h" + +/* Defined in typcache.c */ +typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry; + +/* + * A struct encapsulating some elements of a user's session. For now this + * manages state that applies to parallel query, but it principle it could + * include other things that are currently global variables. + */ +typedef struct Session +{ + dsm_segment *segment; /* The session-scoped DSM segment. */ + dsa_area *area; /* The session-scoped DSA area. */ + + /* State managed by typcache.c. */ + SharedRecordTypmodRegistry *shared_typmod_registry; + dshash_table *shared_record_table; + dshash_table *shared_typmod_table; +} Session; + +extern void InitializeSession(void); +extern dsm_handle GetSessionDsmHandle(void); +extern void AttachSession(dsm_handle handle); +extern void DetachSession(void); + +/* The current session, or NULL for none. */ +extern Session *CurrentSession; + +#endif /* SESSION_H */ diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h index 989fe738bb..c15610e767 100644 --- a/src/include/access/tupdesc.h +++ b/src/include/access/tupdesc.h @@ -92,6 +92,12 @@ extern TupleDesc CreateTupleDescCopy(TupleDesc tupdesc); extern TupleDesc CreateTupleDescCopyConstr(TupleDesc tupdesc); +#define TupleDescSize(src) \ + (offsetof(struct tupleDesc, attrs) + \ + (src)->natts * sizeof(FormData_pg_attribute)) + +extern void TupleDescCopy(TupleDesc dst, TupleDesc src); + extern void TupleDescCopyEntry(TupleDesc dst, AttrNumber dstAttno, TupleDesc src, AttrNumber srcAttno); diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 3d16132c88..f4c4aed7f9 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -212,6 +212,9 @@ typedef enum BuiltinTrancheIds LWTRANCHE_LOCK_MANAGER, LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_QUERY_DSA, + LWTRANCHE_SESSION_DSA, + LWTRANCHE_SESSION_RECORD_TABLE, + LWTRANCHE_SESSION_TYPMOD_TABLE, LWTRANCHE_TBM, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index b4f7592162..41b645a58f 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -18,6 +18,8 @@ #include "access/tupdesc.h" #include "fmgr.h" +#include "storage/dsm.h" +#include "utils/dsa.h" /* DomainConstraintCache is an opaque struct known only within typcache.c */ @@ -143,6 +145,7 @@ typedef struct DomainConstraintRef MemoryContextCallback callback; /* used to release refcount when done */ } DomainConstraintRef; +typedef struct SharedRecordTypmodRegistry SharedRecordTypmodRegistry; extern TypeCacheEntry *lookup_type_cache(Oid type_id, int flags); @@ -164,4 +167,11 @@ extern void assign_record_type_typmod(TupleDesc tupDesc); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern size_t SharedRecordTypmodRegistryEstimate(void); + +extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, + dsm_segment *segment, dsa_area *area); + +extern void SharedRecordTypmodRegistryAttach(SharedRecordTypmodRegistry *); + #endif /* TYPCACHE_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 17ba2bde5c..8ce97da2ee 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2016,6 +2016,10 @@ SharedInvalRelmapMsg SharedInvalSmgrMsg SharedInvalSnapshotMsg SharedInvalidationMessage +SharedRecordTableKey +SharedRecordTableEntry +SharedRecordTypmodRegistry +SharedTypmodTableEntry ShellTypeInfo ShippableCacheEntry ShippableCacheKey