diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c index 06077afed6..858798db85 100644 --- a/contrib/bloom/blutils.c +++ b/contrib/bloom/blutils.c @@ -138,6 +138,9 @@ blhandler(PG_FUNCTION_ARGS) amroutine->amendscan = blendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index 40f201b11b..5d8e557460 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -131,6 +131,11 @@ typedef struct IndexAmRoutine amendscan_function amendscan; ammarkpos_function ammarkpos; /* can be NULL */ amrestrpos_function amrestrpos; /* can be NULL */ + + /* interface functions to support parallel index scans */ + amestimateparallelscan_function amestimateparallelscan; /* can be NULL */ + aminitparallelscan_function aminitparallelscan; /* can be NULL */ + amparallelrescan_function amparallelrescan; /* can be NULL */ } IndexAmRoutine; @@ -624,6 +629,68 @@ amrestrpos (IndexScanDesc scan); the amrestrpos field in its IndexAmRoutine struct may be set to NULL. + + + In addition to supporting ordinary index scans, some types of index + may wish to support parallel index scans, which allow + multiple backends to cooperate in performing an index scan. The + index access method should arrange things so that each cooperating + process returns a subset of the tuples that would be performed by + an ordinary, non-parallel index scan, but in such a way that the + union of those subsets is equal to the set of tuples that would be + returned by an ordinary, non-parallel index scan. Furthermore, while + there need not be any global ordering of tuples returned by a parallel + scan, the ordering of that subset of tuples returned within each + cooperating backend must match the requested ordering. The following + functions may be implemented to support parallel index scans: + + + + +Size +amestimateparallelscan (void); + + Estimate and return the number of bytes of dynamic shared memory which + the access method will be needed to perform a parallel scan. (This number + is in addition to, not in lieu of, the amount of space needed for + AM-independent data in ParallelIndexScanDescData.) + + + + It is not necessary to implement this function for access methods which + do not support parallel scans or for which the number of additional bytes + of storage required is zero. + + + + +void +aminitparallelscan (void *target); + + This function will be called to initialize dynamic shared memory at the + beginning of a parallel scan. target will point to at least + the number of bytes previously returned by + amestimateparallelscan, and this function may use that + amount of space to store whatever data it wishes. + + + + It is not necessary to implement this function for access methods which + do not support parallel scans or in cases where the shared memory space + required needs no initialization. + + + + +void +amparallelrescan (IndexScanDesc scan); + + This function, if implemented, will be called when a parallel index scan + must be restarted. It should reset any shared state set up by + aminitparallelscan such that the scan will be restarted from + the beginning. + + diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index d60ddd242c..b2afdb7bed 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -112,6 +112,9 @@ brinhandler(PG_FUNCTION_ARGS) amroutine->amendscan = brinendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index 3909638906..02d920bb9d 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -68,6 +68,9 @@ ginhandler(PG_FUNCTION_ARGS) amroutine->amendscan = ginendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index 597056ae44..c2247ad2f7 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -89,6 +89,9 @@ gisthandler(PG_FUNCTION_ARGS) amroutine->amendscan = gistendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index a64a9b9696..ec8ed33c70 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -86,6 +86,9 @@ hashhandler(PG_FUNCTION_ARGS) amroutine->amendscan = hashendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 4822af95a3..ba27c1e86d 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -20,6 +20,10 @@ * index_insert - insert an index tuple into a relation * index_markpos - mark a scan position * index_restrpos - restore a scan position + * index_parallelscan_estimate - estimate shared memory for parallel scan + * index_parallelscan_initialize - initialize parallel scan + * index_parallelrescan - (re)start a parallel scan of an index + * index_beginscan_parallel - join parallel index scan * index_getnext_tid - get the next TID from a scan * index_fetch_heap - get the scan's next heap tuple * index_getnext - get the next heap tuple from a scan @@ -120,7 +124,8 @@ do { \ } while(0) static IndexScanDesc index_beginscan_internal(Relation indexRelation, - int nkeys, int norderbys, Snapshot snapshot); + int nkeys, int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan, bool temp_snap); /* ---------------------------------------------------------------- @@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation, { IndexScanDesc scan; - scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot); + scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot, NULL, false); /* * Save additional parameters into the scandesc. Everything else was set @@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation, { IndexScanDesc scan; - scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot); + scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot, NULL, false); /* * Save additional parameters into the scandesc. Everything else was set @@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation, */ static IndexScanDesc index_beginscan_internal(Relation indexRelation, - int nkeys, int norderbys, Snapshot snapshot) + int nkeys, int norderbys, Snapshot snapshot, + ParallelIndexScanDesc pscan, bool temp_snap) { + IndexScanDesc scan; + RELATION_CHECKS; CHECK_REL_PROCEDURE(ambeginscan); @@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation, /* * Tell the AM to open a scan. */ - return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys, + scan = indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys, norderbys); + /* Initialize information for parallel scan. */ + scan->parallel_scan = pscan; + scan->xs_temp_snap = temp_snap; + + return scan; } /* ---------------- @@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan) /* Release index refcount acquired by index_beginscan */ RelationDecrementReferenceCount(scan->indexRelation); + if (scan->xs_temp_snap) + UnregisterSnapshot(scan->xs_snapshot); + /* Release the scan data structure itself */ IndexScanEnd(scan); } @@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan) scan->indexRelation->rd_amroutine->amrestrpos(scan); } +/* + * index_parallelscan_estimate - estimate shared memory for parallel scan + * + * Currently, we don't pass any information to the AM-specific estimator, + * so it can probably only return a constant. In the future, we might need + * to pass more information. + */ +Size +index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot) +{ + Size nbytes; + + RELATION_CHECKS; + + nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data); + nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot)); + nbytes = MAXALIGN(nbytes); + + /* + * If amestimateparallelscan is not provided, assume there is no + * AM-specific data needed. (It's hard to believe that could work, but + * it's easy enough to cater to it here.) + */ + if (indexRelation->rd_amroutine->amestimateparallelscan != NULL) + nbytes = add_size(nbytes, + indexRelation->rd_amroutine->amestimateparallelscan()); + + return nbytes; +} + +/* + * index_parallelscan_initialize - initialize parallel scan + * + * We initialize both the ParallelIndexScanDesc proper and the AM-specific + * information which follows it. + * + * This function calls access method specific initialization routine to + * initialize am specific information. Call this just once in the leader + * process; then, individual workers attach via index_beginscan_parallel. + */ +void +index_parallelscan_initialize(Relation heapRelation, Relation indexRelation, + Snapshot snapshot, ParallelIndexScanDesc target) +{ + Size offset; + + RELATION_CHECKS; + + offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data), + EstimateSnapshotSpace(snapshot)); + offset = MAXALIGN(offset); + + target->ps_relid = RelationGetRelid(heapRelation); + target->ps_indexid = RelationGetRelid(indexRelation); + target->ps_offset = offset; + SerializeSnapshot(snapshot, target->ps_snapshot_data); + + /* aminitparallelscan is optional; assume no-op if not provided by AM */ + if (indexRelation->rd_amroutine->aminitparallelscan != NULL) + { + void *amtarget; + + amtarget = OffsetToPointer(target, offset); + indexRelation->rd_amroutine->aminitparallelscan(amtarget); + } +} + +/* ---------------- + * index_parallelrescan - (re)start a parallel scan of an index + * ---------------- + */ +void +index_parallelrescan(IndexScanDesc scan) +{ + SCAN_CHECKS; + + /* amparallelrescan is optional; assume no-op if not provided by AM */ + if (scan->indexRelation->rd_amroutine->amparallelrescan != NULL) + scan->indexRelation->rd_amroutine->amparallelrescan(scan); +} + +/* + * index_beginscan_parallel - join parallel index scan + * + * Caller must be holding suitable locks on the heap and the index. + */ +IndexScanDesc +index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, + int norderbys, ParallelIndexScanDesc pscan) +{ + Snapshot snapshot; + IndexScanDesc scan; + + Assert(RelationGetRelid(heaprel) == pscan->ps_relid); + snapshot = RestoreSnapshot(pscan->ps_snapshot_data); + RegisterSnapshot(snapshot); + scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot, + pscan, true); + + /* + * Save additional parameters into the scandesc. Everything else was set + * up by index_beginscan_internal. + */ + scan->heapRelation = heaprel; + scan->xs_snapshot = snapshot; + + return scan; +} + /* ---------------- * index_getnext_tid - get the next TID from a scan * diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 1bb1acfea6..469e7abe4d 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -118,6 +118,9 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amendscan = btendscan; amroutine->ammarkpos = btmarkpos; amroutine->amrestrpos = btrestrpos; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index ca4b0bdbe4..78846bec66 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -68,6 +68,9 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amendscan = spgendscan; amroutine->ammarkpos = NULL; amroutine->amrestrpos = NULL; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; + amroutine->amparallelrescan = NULL; PG_RETURN_POINTER(amroutine); } diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 6a5f279e7f..e91e41dc0f 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -137,6 +137,18 @@ typedef void (*ammarkpos_function) (IndexScanDesc scan); /* restore marked scan position */ typedef void (*amrestrpos_function) (IndexScanDesc scan); +/* + * Callback function signatures - for parallel index scans. + */ + +/* estimate size of parallel scan descriptor */ +typedef Size (*amestimateparallelscan_function) (void); + +/* prepare for parallel index scan */ +typedef void (*aminitparallelscan_function) (void *target); + +/* (re)start parallel index scan */ +typedef void (*amparallelrescan_function) (IndexScanDesc scan); /* * API struct for an index AM. Note this must be stored in a single palloc'd @@ -196,6 +208,11 @@ typedef struct IndexAmRoutine amendscan_function amendscan; ammarkpos_function ammarkpos; /* can be NULL */ amrestrpos_function amrestrpos; /* can be NULL */ + + /* interface functions to support parallel index scans */ + amestimateparallelscan_function amestimateparallelscan; /* can be NULL */ + aminitparallelscan_function aminitparallelscan; /* can be NULL */ + amparallelrescan_function amparallelrescan; /* can be NULL */ } IndexAmRoutine; diff --git a/src/include/access/genam.h b/src/include/access/genam.h index b2e078aed2..51466b96e8 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state); typedef struct IndexScanDescData *IndexScanDesc; typedef struct SysScanDescData *SysScanDesc; +typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc; + /* * Enumeration specifying the type of uniqueness check to perform in * index_insert(). @@ -144,6 +146,13 @@ extern void index_rescan(IndexScanDesc scan, extern void index_endscan(IndexScanDesc scan); extern void index_markpos(IndexScanDesc scan); extern void index_restrpos(IndexScanDesc scan); +extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot); +extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel, + Snapshot snapshot, ParallelIndexScanDesc target); +extern void index_parallelrescan(IndexScanDesc scan); +extern IndexScanDesc index_beginscan_parallel(Relation heaprel, + Relation indexrel, int nkeys, int norderbys, + ParallelIndexScanDesc pscan); extern ItemPointer index_getnext_tid(IndexScanDesc scan, ScanDirection direction); extern HeapTuple index_fetch_heap(IndexScanDesc scan); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 8746045d8d..ce3ca8d4ac 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -93,6 +93,7 @@ typedef struct IndexScanDescData ScanKey keyData; /* array of index qualifier descriptors */ ScanKey orderByData; /* array of ordering op descriptors */ bool xs_want_itup; /* caller requests index tuples */ + bool xs_temp_snap; /* unregister snapshot at scan end? */ /* signaling to index AM about killing index tuples */ bool kill_prior_tuple; /* last-returned tuple is dead */ @@ -126,8 +127,20 @@ typedef struct IndexScanDescData /* state data for traversing HOT chains in index_getnext */ bool xs_continue_hot; /* T if must keep walking HOT chain */ + + /* parallel index scan information, in shared memory */ + ParallelIndexScanDesc parallel_scan; } IndexScanDescData; +/* Generic structure for parallel scans */ +typedef struct ParallelIndexScanDescData +{ + Oid ps_relid; + Oid ps_indexid; + Size ps_offset; /* Offset in bytes of am specific structure */ + char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelIndexScanDescData; + /* Struct for heap-or-index scans of system tables */ typedef struct SysScanDescData { diff --git a/src/include/c.h b/src/include/c.h index efbb77f540..a2c043adfb 100644 --- a/src/include/c.h +++ b/src/include/c.h @@ -527,6 +527,9 @@ typedef NameData *Name; #define PointerIsAligned(pointer, type) \ (((uintptr_t)(pointer) % (sizeof (type))) == 0) +#define OffsetToPointer(base, offset) \ + ((void *)((char *) base + offset)) + #define OidIsValid(objectId) ((bool) ((objectId) != InvalidOid)) #define RegProcedureIsValid(p) OidIsValid(p) diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 993880da43..c4235ae63a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1264,6 +1264,8 @@ OverrideSearchPath OverrideStackEntry PACE_HEADER PACL +ParallelIndexScanDesc +ParallelIndexScanDescData PATH PBOOL PCtxtHandle