diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index 5c33c40ae9..32994719e3 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -122,6 +122,9 @@ worker. This includes: values are restored, this incidentally sets SessionUserId and OuterUserId to the correct values. This final step restores CurrentUserId. + - State related to pending REINDEX operations, which prevents access to + an index that is currently being rebuilt. + To prevent undetected or unprincipled deadlocks when running in parallel mode, this could should eventually handle heavyweight locks in some way. This is not implemented yet. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index f720896e50..0a0157a878 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -18,6 +18,7 @@ #include "access/session.h" #include "access/xact.h" #include "access/xlog.h" +#include "catalog/index.h" #include "catalog/namespace.h" #include "commands/async.h" #include "executor/execParallel.h" @@ -67,6 +68,7 @@ #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) +#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt) Size tsnaplen = 0; Size asnaplen = 0; Size tstatelen = 0; + Size reindexlen = 0; Size segsize = 0; int i; FixedParallelState *fps; @@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt) tstatelen = EstimateTransactionStateSpace(); shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); + reindexlen = EstimateReindexStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, reindexlen); /* If you add more chunks here, you probably need to add keys. */ - shm_toc_estimate_keys(&pcxt->estimator, 7); + shm_toc_estimate_keys(&pcxt->estimator, 8); /* Estimate space need for error queues. */ StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == @@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *tsnapspace; char *asnapspace; char *tstatespace; + char *reindexspace; char *error_queue_space; char *session_dsm_handle_space; char *entrypointstate; @@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt) SerializeTransactionState(tstatelen, tstatespace); shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + /* Serialize reindex state. */ + reindexspace = shm_toc_allocate(pcxt->toc, reindexlen); + SerializeReindexState(reindexlen, reindexspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace); + /* Allocate space for worker information. */ pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); @@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg) char *tsnapspace; char *asnapspace; char *tstatespace; + char *reindexspace; StringInfoData msgbuf; char *session_dsm_handle_space; @@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg) /* Set ParallelMasterBackendId so we know how to address temp relations. */ ParallelMasterBackendId = fps->parallel_master_backend_id; + /* Restore reindex state. */ + reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); + RestoreReindexState(reindexspace); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 330488b96f..007b929a6f 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -86,6 +86,18 @@ typedef struct tups_inserted; } v_i_state; +/* + * Pointer-free representation of variables used when reindexing system + * catalogs; we use this to propagate those values to parallel workers. + */ +typedef struct +{ + Oid currentlyReindexedHeap; + Oid currentlyReindexedIndex; + int numPendingReindexedIndexes; + Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER]; +} SerializedReindexState; + /* non-export function prototypes */ static bool relationHasPrimaryKey(Relation rel); static TupleDesc ConstructTupleDescriptor(Relation heapRelation, @@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options) * When we are busy reindexing a system index, this code provides support * for preventing catalog lookups from using that index. We also make use * of this to catch attempted uses of user indexes during reindexing of - * those indexes. + * those indexes. This information is propagated to parallel workers; + * attempting to change it during a parallel operation is not permitted. * ---------------------------------------------------------------- */ @@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) static void ResetReindexProcessing(void) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); currentlyReindexedHeap = InvalidOid; currentlyReindexedIndex = InvalidOid; } @@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes) /* Reindexing is not re-entrant. */ if (pendingReindexedIndexes) elog(ERROR, "cannot reindex while reindexing"); + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = list_copy(indexes); } @@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes) static void RemoveReindexPending(Oid indexOid) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes, indexOid); } @@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid) static void ResetReindexPending(void) { + if (IsInParallelMode()) + elog(ERROR, "cannot modify reindex state during a parallel operation"); pendingReindexedIndexes = NIL; } + +/* + * EstimateReindexStateSpace + * Estimate space needed to pass reindex state to parallel workers. + */ +extern Size +EstimateReindexStateSpace(void) +{ + return offsetof(SerializedReindexState, pendingReindexedIndexes) + + mul_size(sizeof(Oid), list_length(pendingReindexedIndexes)); +} + +/* + * SerializeReindexState + * Serialize reindex state for parallel workers. + */ +extern void +SerializeReindexState(Size maxsize, char *start_address) +{ + SerializedReindexState *sistate = (SerializedReindexState *) start_address; + int c = 0; + ListCell *lc; + + sistate->currentlyReindexedHeap = currentlyReindexedHeap; + sistate->currentlyReindexedIndex = currentlyReindexedIndex; + sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes); + foreach(lc, pendingReindexedIndexes) + sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc); +} + +/* + * RestoreReindexState + * Restore reindex state in a parallel worker. + */ +extern void +RestoreReindexState(void *reindexstate) +{ + SerializedReindexState *sistate = (SerializedReindexState *) reindexstate; + int c = 0; + MemoryContext oldcontext; + + currentlyReindexedHeap = sistate->currentlyReindexedHeap; + currentlyReindexedIndex = sistate->currentlyReindexedIndex; + + Assert(pendingReindexedIndexes == NIL); + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + for (c = 0; c < sistate->numPendingReindexedIndexes; ++c) + pendingReindexedIndexes = + lappend_oid(pendingReindexedIndexes, + sistate->pendingReindexedIndexes[c]); + MemoryContextSwitchTo(oldcontext); +} diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 12bf35567a..4790f0c735 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid); extern bool ReindexIsProcessingIndex(Oid indexOid); extern Oid IndexGetRelation(Oid indexId, bool missing_ok); +extern Size EstimateReindexStateSpace(void); +extern void SerializeReindexState(Size maxsize, char *start_address); +extern void RestoreReindexState(void *reindexstate); + #endif /* INDEX_H */