Transfer state pertaining to pending REINDEX operations to workers.
This will allow the pending patch for parallel CREATE INDEX to work on system catalogs, and to provide the same level of protection against use of user indexes while they are being rebuilt that we have for non-parallel CREATE INDEX. Patch by me, reviewed by Peter Geoghegan. Discussion: http://postgr.es/m/CA+TgmoYN-YQU9JsGQcqFLovZ-C+Xgp1_xhJQad=cunGG-_p5gg@mail.gmail.com Discussion: http://postgr.es/m/CAH2-Wzkv4UNkXYhqQRqk-u9rS7h5c-4cCW+EqQ8K_WSeS43aZg@mail.gmail.com
This commit is contained in:
parent
4e54dd2e0a
commit
29d58fd3ad
|
@ -122,6 +122,9 @@ worker. This includes:
|
||||||
values are restored, this incidentally sets SessionUserId and OuterUserId
|
values are restored, this incidentally sets SessionUserId and OuterUserId
|
||||||
to the correct values. This final step restores CurrentUserId.
|
to the correct values. This final step restores CurrentUserId.
|
||||||
|
|
||||||
|
- State related to pending REINDEX operations, which prevents access to
|
||||||
|
an index that is currently being rebuilt.
|
||||||
|
|
||||||
To prevent undetected or unprincipled deadlocks when running in parallel mode,
|
To prevent undetected or unprincipled deadlocks when running in parallel mode,
|
||||||
this could should eventually handle heavyweight locks in some way. This is
|
this could should eventually handle heavyweight locks in some way. This is
|
||||||
not implemented yet.
|
not implemented yet.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "access/session.h"
|
#include "access/session.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
|
#include "catalog/index.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "commands/async.h"
|
#include "commands/async.h"
|
||||||
#include "executor/execParallel.h"
|
#include "executor/execParallel.h"
|
||||||
|
@ -67,6 +68,7 @@
|
||||||
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
|
#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008)
|
||||||
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
|
#define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009)
|
||||||
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
|
#define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A)
|
||||||
|
#define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B)
|
||||||
|
|
||||||
/* Fixed-size parallel state. */
|
/* Fixed-size parallel state. */
|
||||||
typedef struct FixedParallelState
|
typedef struct FixedParallelState
|
||||||
|
@ -200,6 +202,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
|
||||||
Size tsnaplen = 0;
|
Size tsnaplen = 0;
|
||||||
Size asnaplen = 0;
|
Size asnaplen = 0;
|
||||||
Size tstatelen = 0;
|
Size tstatelen = 0;
|
||||||
|
Size reindexlen = 0;
|
||||||
Size segsize = 0;
|
Size segsize = 0;
|
||||||
int i;
|
int i;
|
||||||
FixedParallelState *fps;
|
FixedParallelState *fps;
|
||||||
|
@ -249,8 +252,10 @@ InitializeParallelDSM(ParallelContext *pcxt)
|
||||||
tstatelen = EstimateTransactionStateSpace();
|
tstatelen = EstimateTransactionStateSpace();
|
||||||
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
|
shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
|
||||||
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
|
shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
|
||||||
|
reindexlen = EstimateReindexStateSpace();
|
||||||
|
shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
|
||||||
/* If you add more chunks here, you probably need to add keys. */
|
/* If you add more chunks here, you probably need to add keys. */
|
||||||
shm_toc_estimate_keys(&pcxt->estimator, 7);
|
shm_toc_estimate_keys(&pcxt->estimator, 8);
|
||||||
|
|
||||||
/* Estimate space need for error queues. */
|
/* Estimate space need for error queues. */
|
||||||
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
|
StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
|
||||||
|
@ -319,6 +324,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
|
||||||
char *tsnapspace;
|
char *tsnapspace;
|
||||||
char *asnapspace;
|
char *asnapspace;
|
||||||
char *tstatespace;
|
char *tstatespace;
|
||||||
|
char *reindexspace;
|
||||||
char *error_queue_space;
|
char *error_queue_space;
|
||||||
char *session_dsm_handle_space;
|
char *session_dsm_handle_space;
|
||||||
char *entrypointstate;
|
char *entrypointstate;
|
||||||
|
@ -360,6 +366,11 @@ InitializeParallelDSM(ParallelContext *pcxt)
|
||||||
SerializeTransactionState(tstatelen, tstatespace);
|
SerializeTransactionState(tstatelen, tstatespace);
|
||||||
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
|
||||||
|
|
||||||
|
/* Serialize reindex state. */
|
||||||
|
reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
|
||||||
|
SerializeReindexState(reindexlen, reindexspace);
|
||||||
|
shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
|
||||||
|
|
||||||
/* Allocate space for worker information. */
|
/* Allocate space for worker information. */
|
||||||
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
|
pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
|
||||||
|
|
||||||
|
@ -972,6 +983,7 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
char *tsnapspace;
|
char *tsnapspace;
|
||||||
char *asnapspace;
|
char *asnapspace;
|
||||||
char *tstatespace;
|
char *tstatespace;
|
||||||
|
char *reindexspace;
|
||||||
StringInfoData msgbuf;
|
StringInfoData msgbuf;
|
||||||
char *session_dsm_handle_space;
|
char *session_dsm_handle_space;
|
||||||
|
|
||||||
|
@ -1137,6 +1149,10 @@ ParallelWorkerMain(Datum main_arg)
|
||||||
/* Set ParallelMasterBackendId so we know how to address temp relations. */
|
/* Set ParallelMasterBackendId so we know how to address temp relations. */
|
||||||
ParallelMasterBackendId = fps->parallel_master_backend_id;
|
ParallelMasterBackendId = fps->parallel_master_backend_id;
|
||||||
|
|
||||||
|
/* Restore reindex state. */
|
||||||
|
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
|
||||||
|
RestoreReindexState(reindexspace);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We've initialized all of our state now; nothing should change
|
* We've initialized all of our state now; nothing should change
|
||||||
* hereafter.
|
* hereafter.
|
||||||
|
|
|
@ -86,6 +86,18 @@ typedef struct
|
||||||
tups_inserted;
|
tups_inserted;
|
||||||
} v_i_state;
|
} v_i_state;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Pointer-free representation of variables used when reindexing system
|
||||||
|
* catalogs; we use this to propagate those values to parallel workers.
|
||||||
|
*/
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
Oid currentlyReindexedHeap;
|
||||||
|
Oid currentlyReindexedIndex;
|
||||||
|
int numPendingReindexedIndexes;
|
||||||
|
Oid pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
|
||||||
|
} SerializedReindexState;
|
||||||
|
|
||||||
/* non-export function prototypes */
|
/* non-export function prototypes */
|
||||||
static bool relationHasPrimaryKey(Relation rel);
|
static bool relationHasPrimaryKey(Relation rel);
|
||||||
static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
|
static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
|
||||||
|
@ -3653,7 +3665,8 @@ reindex_relation(Oid relid, int flags, int options)
|
||||||
* When we are busy reindexing a system index, this code provides support
|
* When we are busy reindexing a system index, this code provides support
|
||||||
* for preventing catalog lookups from using that index. We also make use
|
* for preventing catalog lookups from using that index. We also make use
|
||||||
* of this to catch attempted uses of user indexes during reindexing of
|
* of this to catch attempted uses of user indexes during reindexing of
|
||||||
* those indexes.
|
* those indexes. This information is propagated to parallel workers;
|
||||||
|
* attempting to change it during a parallel operation is not permitted.
|
||||||
* ----------------------------------------------------------------
|
* ----------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -3719,6 +3732,8 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
|
||||||
static void
|
static void
|
||||||
ResetReindexProcessing(void)
|
ResetReindexProcessing(void)
|
||||||
{
|
{
|
||||||
|
if (IsInParallelMode())
|
||||||
|
elog(ERROR, "cannot modify reindex state during a parallel operation");
|
||||||
currentlyReindexedHeap = InvalidOid;
|
currentlyReindexedHeap = InvalidOid;
|
||||||
currentlyReindexedIndex = InvalidOid;
|
currentlyReindexedIndex = InvalidOid;
|
||||||
}
|
}
|
||||||
|
@ -3736,6 +3751,8 @@ SetReindexPending(List *indexes)
|
||||||
/* Reindexing is not re-entrant. */
|
/* Reindexing is not re-entrant. */
|
||||||
if (pendingReindexedIndexes)
|
if (pendingReindexedIndexes)
|
||||||
elog(ERROR, "cannot reindex while reindexing");
|
elog(ERROR, "cannot reindex while reindexing");
|
||||||
|
if (IsInParallelMode())
|
||||||
|
elog(ERROR, "cannot modify reindex state during a parallel operation");
|
||||||
pendingReindexedIndexes = list_copy(indexes);
|
pendingReindexedIndexes = list_copy(indexes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3746,6 +3763,8 @@ SetReindexPending(List *indexes)
|
||||||
static void
|
static void
|
||||||
RemoveReindexPending(Oid indexOid)
|
RemoveReindexPending(Oid indexOid)
|
||||||
{
|
{
|
||||||
|
if (IsInParallelMode())
|
||||||
|
elog(ERROR, "cannot modify reindex state during a parallel operation");
|
||||||
pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
|
pendingReindexedIndexes = list_delete_oid(pendingReindexedIndexes,
|
||||||
indexOid);
|
indexOid);
|
||||||
}
|
}
|
||||||
|
@ -3757,5 +3776,59 @@ RemoveReindexPending(Oid indexOid)
|
||||||
static void
|
static void
|
||||||
ResetReindexPending(void)
|
ResetReindexPending(void)
|
||||||
{
|
{
|
||||||
|
if (IsInParallelMode())
|
||||||
|
elog(ERROR, "cannot modify reindex state during a parallel operation");
|
||||||
pendingReindexedIndexes = NIL;
|
pendingReindexedIndexes = NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EstimateReindexStateSpace
|
||||||
|
* Estimate space needed to pass reindex state to parallel workers.
|
||||||
|
*/
|
||||||
|
extern Size
|
||||||
|
EstimateReindexStateSpace(void)
|
||||||
|
{
|
||||||
|
return offsetof(SerializedReindexState, pendingReindexedIndexes)
|
||||||
|
+ mul_size(sizeof(Oid), list_length(pendingReindexedIndexes));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SerializeReindexState
|
||||||
|
* Serialize reindex state for parallel workers.
|
||||||
|
*/
|
||||||
|
extern void
|
||||||
|
SerializeReindexState(Size maxsize, char *start_address)
|
||||||
|
{
|
||||||
|
SerializedReindexState *sistate = (SerializedReindexState *) start_address;
|
||||||
|
int c = 0;
|
||||||
|
ListCell *lc;
|
||||||
|
|
||||||
|
sistate->currentlyReindexedHeap = currentlyReindexedHeap;
|
||||||
|
sistate->currentlyReindexedIndex = currentlyReindexedIndex;
|
||||||
|
sistate->numPendingReindexedIndexes = list_length(pendingReindexedIndexes);
|
||||||
|
foreach(lc, pendingReindexedIndexes)
|
||||||
|
sistate->pendingReindexedIndexes[c++] = lfirst_oid(lc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RestoreReindexState
|
||||||
|
* Restore reindex state in a parallel worker.
|
||||||
|
*/
|
||||||
|
extern void
|
||||||
|
RestoreReindexState(void *reindexstate)
|
||||||
|
{
|
||||||
|
SerializedReindexState *sistate = (SerializedReindexState *) reindexstate;
|
||||||
|
int c = 0;
|
||||||
|
MemoryContext oldcontext;
|
||||||
|
|
||||||
|
currentlyReindexedHeap = sistate->currentlyReindexedHeap;
|
||||||
|
currentlyReindexedIndex = sistate->currentlyReindexedIndex;
|
||||||
|
|
||||||
|
Assert(pendingReindexedIndexes == NIL);
|
||||||
|
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
|
||||||
|
for (c = 0; c < sistate->numPendingReindexedIndexes; ++c)
|
||||||
|
pendingReindexedIndexes =
|
||||||
|
lappend_oid(pendingReindexedIndexes,
|
||||||
|
sistate->pendingReindexedIndexes[c]);
|
||||||
|
MemoryContextSwitchTo(oldcontext);
|
||||||
|
}
|
||||||
|
|
|
@ -134,4 +134,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
|
||||||
extern bool ReindexIsProcessingIndex(Oid indexOid);
|
extern bool ReindexIsProcessingIndex(Oid indexOid);
|
||||||
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
|
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
|
||||||
|
|
||||||
|
extern Size EstimateReindexStateSpace(void);
|
||||||
|
extern void SerializeReindexState(Size maxsize, char *start_address);
|
||||||
|
extern void RestoreReindexState(void *reindexstate);
|
||||||
|
|
||||||
#endif /* INDEX_H */
|
#endif /* INDEX_H */
|
||||||
|
|
Loading…
Reference in New Issue