Improve parallel vacuum implementation.

Previously, in parallel vacuum, we allocated shmem area of
IndexBulkDeleteResult only for indexes where parallel index vacuuming is
safe and had null-bitmap in shmem area to access them. This logic was too
complicated with a small benefit of saving only a few bits per indexes.

In this commit, we allocate a dedicated shmem area for the array of
LVParallelIndStats that includes a parallel-safety flag, the index vacuum
status, and IndexBulkdeleteResult. There is one array element for every
index, even those indexes where parallel index vacuuming is unsafe or not
worthwhile. This commit makes the code clear by removing all
bitmap-related code.

Also, add the check each index vacuum status after parallel index vacuum
to make sure that all indexes have been processed.

Finally, rename parallel vacuum functions to parallel_vacuum_* for
consistency.

Author: Masahiko Sawada, based on suggestions by Andres Freund
Reviewed-by: Hou Zhijie, Amit Kapila
Discussion: https://www.postgresql.org/message-id/20211030212101.ae3qcouatwmy7tbr%40alap3.anarazel.de
This commit is contained in:
Amit Kapila 2021-12-15 07:58:19 +05:30
parent 7acd01015c
commit 22bd3cbe0c
2 changed files with 292 additions and 322 deletions

View File

@ -130,6 +130,7 @@
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
#define PARALLEL_VACUUM_KEY_INDEX_STATS 6
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
@ -181,14 +182,6 @@ typedef struct LVShared
Oid relid;
int elevel;
/*
* An indication for vacuum workers to perform either index vacuum or
* index cleanup. first_time is true only if for_cleanup is true and
* bulk-deletion is not performed yet.
*/
bool for_cleanup;
bool first_time;
/*
* Fields for both index vacuum and cleanup.
*
@ -226,33 +219,44 @@ typedef struct LVShared
*/
pg_atomic_uint32 active_nworkers;
/*
* Variables to control parallel vacuum. We have a bitmap to indicate
* which index has stats in shared memory. The set bit in the map
* indicates that the particular index supports a parallel vacuum.
*/
pg_atomic_uint32 idx; /* counter for vacuuming and clean up */
uint32 offset; /* sizeof header incl. bitmap */
bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */
/* Shared index statistics data follows at end of struct */
/* Counter for vacuuming and cleanup */
pg_atomic_uint32 idx;
} LVShared;
#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
#define GetSharedIndStats(s) \
((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
#define IndStatsIsNull(s, i) \
(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
/* Status used during parallel index vacuum or cleanup */
typedef enum LVParallelIndVacStatus
{
PARALLEL_INDVAC_STATUS_INITIAL = 0,
PARALLEL_INDVAC_STATUS_NEED_BULKDELETE,
PARALLEL_INDVAC_STATUS_NEED_CLEANUP,
PARALLEL_INDVAC_STATUS_COMPLETED
} LVParallelIndVacStatus;
/*
* Struct for an index bulk-deletion statistic used for parallel vacuum. This
* is allocated in the DSM segment.
* Struct for index vacuum statistics of an index that is used for parallel vacuum.
* This includes the status of parallel index vacuum as well as index statistics.
*/
typedef struct LVSharedIndStats
typedef struct LVParallelIndStats
{
bool updated; /* are the stats updated? */
/*
* The following two fields are set by leader process before executing
* parallel index vacuum or parallel index cleanup. These fields are not
* fixed for the entire VACUUM operation. They are only fixed for an
* individual parallel index vacuum and cleanup.
*
* parallel_workers_can_process is true if both leader and worker can
* process the index, otherwise only leader can process it.
*/
LVParallelIndVacStatus status;
bool parallel_workers_can_process;
/*
* Individual worker or leader stores the result of index vacuum or
* cleanup.
*/
bool istat_updated; /* are the stats updated? */
IndexBulkDeleteResult istat;
} LVSharedIndStats;
} LVParallelIndStats;
/* Struct for maintaining a parallel vacuum state. */
typedef struct LVParallelState
@ -262,12 +266,29 @@ typedef struct LVParallelState
/* Shared information among parallel vacuum workers */
LVShared *lvshared;
/*
* Shared index statistics among parallel vacuum workers. The array
* element is allocated for every index, even those indexes where parallel
* index vacuuming is unsafe or not worthwhile (e.g.,
* will_parallel_vacuum[] is false). During parallel vacuum,
* IndexBulkDeleteResult of each index is kept in DSM and is copied into
* local memory at the end of parallel vacuum.
*/
LVParallelIndStats *lvpindstats;
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
/* Points to WAL usage area in DSM */
WalUsage *wal_usage;
/*
* False if the index is totally unsuitable target for all parallel
* processing. For example, the index could be <
* min_parallel_index_scan_size cutoff.
*/
bool *will_parallel_vacuum;
/*
* The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively.
@ -391,19 +412,14 @@ static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno,
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup,
LVRelState *vacrel);
static bool lazy_check_wraparound_failsafe(LVRelState *vacrel);
static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel);
static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel);
static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers);
static void do_parallel_processing(LVRelState *vacrel,
LVShared *lvshared);
static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel,
LVShared *lvshared);
static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel,
IndexBulkDeleteResult *istat,
LVShared *lvshared,
LVSharedIndStats *shared_indstats,
LVRelState *vacrel);
static void lazy_cleanup_all_indexes(LVRelState *vacrel);
static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum);
static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
LVParallelIndStats *pindstats);
static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel);
static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
LVShared *shared,
LVParallelIndStats *pindstats);
static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel,
IndexBulkDeleteResult *istat,
double reltuples,
@ -425,14 +441,13 @@ static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
static int vac_cmp_itemptr(const void *left, const void *right);
static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
static int compute_parallel_vacuum_workers(LVRelState *vacrel,
int nrequested,
static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
bool *will_parallel_vacuum);
static void update_index_statistics(LVRelState *vacrel);
static void begin_parallel_vacuum(LVRelState *vacrel, int nrequested);
static void end_parallel_vacuum(LVRelState *vacrel);
static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx);
static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared);
static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested);
static void parallel_vacuum_end(LVRelState *vacrel);
static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
bool vacuum);
static void vacuum_error_callback(void *arg);
static void update_vacuum_error_info(LVRelState *vacrel,
LVSavedErrInfo *saved_vacrel,
@ -2237,7 +2252,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
else
{
/* Outsource everything to parallel variant */
do_parallel_lazy_vacuum_all_indexes(vacrel);
parallel_vacuum_process_all_indexes(vacrel, true);
/*
* Do a postcheck to consider applying wraparound failsafe now. Note
@ -2610,99 +2625,92 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
return false;
}
/*
* Perform lazy_vacuum_all_indexes() steps in parallel
*/
static void
do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel)
{
/* Tell parallel workers to do index vacuuming */
vacrel->lps->lvshared->for_cleanup = false;
vacrel->lps->lvshared->first_time = false;
/*
* We can only provide an approximate value of num_heap_tuples, at least
* for now. Matches serial VACUUM case.
*/
vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
vacrel->lps->lvshared->estimated_count = true;
do_parallel_vacuum_or_cleanup(vacrel,
vacrel->lps->nindexes_parallel_bulkdel);
}
/*
* Perform lazy_cleanup_all_indexes() steps in parallel
*/
static void
do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel)
{
int nworkers;
/*
* If parallel vacuum is active we perform index cleanup with parallel
* workers.
*
* Tell parallel workers to do index cleanup.
*/
vacrel->lps->lvshared->for_cleanup = true;
vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0);
/*
* Now we can provide a better estimate of total number of surviving
* tuples (we assume indexes are more interested in that than in the
* number of nominally live tuples).
*/
vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
vacrel->lps->lvshared->estimated_count =
(vacrel->tupcount_pages < vacrel->rel_pages);
/* Determine the number of parallel workers to launch */
if (vacrel->lps->lvshared->first_time)
nworkers = vacrel->lps->nindexes_parallel_cleanup +
vacrel->lps->nindexes_parallel_condcleanup;
else
nworkers = vacrel->lps->nindexes_parallel_cleanup;
do_parallel_vacuum_or_cleanup(vacrel, nworkers);
}
/*
* Perform index vacuum or index cleanup with parallel workers. This function
* must be used by the parallel vacuum leader process. The caller must set
* lps->lvshared->for_cleanup to indicate whether to perform vacuum or
* cleanup.
* must be used by the parallel vacuum leader process.
*/
static void
do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum)
{
LVParallelState *lps = vacrel->lps;
LVParallelIndVacStatus new_status;
int nworkers;
Assert(!IsParallelWorker());
Assert(ParallelVacuumIsActive(vacrel));
Assert(vacrel->nindexes > 0);
if (vacuum)
{
/*
* We can only provide an approximate value of num_heap_tuples, at
* least for now. Matches serial VACUUM case.
*/
vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples;
vacrel->lps->lvshared->estimated_count = true;
new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE;
/* Determine the number of parallel workers to launch */
nworkers = vacrel->lps->nindexes_parallel_bulkdel;
}
else
{
/*
* We can provide a better estimate of total number of surviving
* tuples (we assume indexes are more interested in that than in the
* number of nominally live tuples).
*/
vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples;
vacrel->lps->lvshared->estimated_count =
(vacrel->tupcount_pages < vacrel->rel_pages);
new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP;
/* Determine the number of parallel workers to launch */
nworkers = vacrel->lps->nindexes_parallel_cleanup;
/* Add conditionally parallel-aware indexes if in the first time call */
if (vacrel->num_index_scans == 0)
nworkers += vacrel->lps->nindexes_parallel_condcleanup;
}
/* The leader process will participate */
nworkers--;
/*
* It is possible that parallel context is initialized with fewer workers
* than the number of indexes that need a separate worker in the current
* phase, so we need to consider it. See compute_parallel_vacuum_workers.
* phase, so we need to consider it. See
* parallel_vacuum_compute_workers().
*/
nworkers = Min(nworkers, lps->pcxt->nworkers);
/*
* Set index vacuum status and mark whether parallel vacuum worker can
* process it.
*/
for (int i = 0; i < vacrel->nindexes; i++)
{
LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]);
Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL);
pindstats->status = new_status;
pindstats->parallel_workers_can_process =
(lps->will_parallel_vacuum[i] &
parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i],
vacuum));
}
/* Reset the parallel index processing counter */
pg_atomic_write_u32(&(lps->lvshared->idx), 0);
/* Setup the shared cost-based vacuum delay and launch workers */
if (nworkers > 0)
{
/* Reinitialize parallel context to relaunch parallel workers */
if (vacrel->num_index_scans > 0)
{
/* Reset the parallel index processing counter */
pg_atomic_write_u32(&(lps->lvshared->idx), 0);
/* Reinitialize the parallel context to relaunch parallel workers */
ReinitializeParallelDSM(lps->pcxt);
}
/*
* Set up shared cost balance and the number of active workers for
@ -2735,28 +2743,28 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
}
if (lps->lvshared->for_cleanup)
ereport(elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
"launched %d parallel vacuum workers for index cleanup (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
else
if (vacuum)
ereport(elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
"launched %d parallel vacuum workers for index vacuuming (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
else
ereport(elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
"launched %d parallel vacuum workers for index cleanup (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
}
/* Process the indexes that can be processed by only leader process */
do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared);
parallel_vacuum_process_unsafe_indexes(vacrel);
/*
* Join as a parallel worker. The leader process alone processes all the
* indexes in the case where no workers are launched.
* Join as a parallel worker. The leader process alone processes all
* parallel-safe indexes in the case where no workers are launched.
*/
do_parallel_processing(vacrel, lps->lvshared);
parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats);
/*
* Next, accumulate buffer and WAL usage. (This must wait for the workers
@ -2771,6 +2779,21 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
}
/*
* Reset all index status back to initial (while checking that we have
* processed all indexes).
*/
for (int i = 0; i < vacrel->nindexes; i++)
{
LVParallelIndStats *pindstats = &(lps->lvpindstats[i]);
if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED)
elog(ERROR, "parallel index vacuum on index \"%s\" is not completed",
RelationGetRelationName(vacrel->indrels[i]));
pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL;
}
/*
* Carry the shared balance value to heap scan and disable shared costing
*/
@ -2787,7 +2810,8 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers)
* vacuum worker processes to process the indexes in parallel.
*/
static void
do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared,
LVParallelIndStats *pindstats)
{
/*
* Increment the active worker count if we are able to launch any worker.
@ -2799,39 +2823,28 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
for (;;)
{
int idx;
LVSharedIndStats *shared_istat;
Relation indrel;
IndexBulkDeleteResult *istat;
LVParallelIndStats *pis;
/* Get an index number to process */
idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
idx = pg_atomic_fetch_add_u32(&(shared->idx), 1);
/* Done for all indexes? */
if (idx >= vacrel->nindexes)
break;
/* Get the index statistics space from DSM, if any */
shared_istat = parallel_stats_for_idx(lvshared, idx);
/* Skip indexes not participating in parallelism */
if (shared_istat == NULL)
continue;
indrel = vacrel->indrels[idx];
pis = &(pindstats[idx]);
/*
* Skip processing indexes that are unsafe for workers (these are
* processed in do_serial_processing_for_unsafe_indexes() by leader)
* Skip processing index that is unsafe for workers or has an
* unsuitable target for parallel index vacuum (this is processed in
* parallel_vacuum_process_unsafe_indexes() by the leader).
*/
if (!parallel_processing_is_safe(indrel, lvshared))
if (!pis->parallel_workers_can_process)
continue;
/* Do vacuum or cleanup of the index */
istat = vacrel->indstats[idx];
vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
lvshared,
shared_istat,
vacrel);
parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
shared, pis);
}
/*
@ -2847,15 +2860,16 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared)
*
* Handles index vacuuming (or index cleanup) for indexes that are not
* parallel safe. It's possible that this will vary for a given index, based
* on details like whether we're performing for_cleanup processing right now.
* on details like whether we're performing index cleanup right now.
*
* Also performs processing of smaller indexes that fell under the size cutoff
* enforced by compute_parallel_vacuum_workers(). These indexes never get a
* slot for statistics in DSM.
* enforced by parallel_vacuum_compute_workers().
*/
static void
do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel)
{
LVParallelState *lps = vacrel->lps;
Assert(!IsParallelWorker());
/*
@ -2866,28 +2880,15 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
for (int idx = 0; idx < vacrel->nindexes; idx++)
{
LVSharedIndStats *shared_istat;
Relation indrel;
IndexBulkDeleteResult *istat;
LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
shared_istat = parallel_stats_for_idx(lvshared, idx);
indrel = vacrel->indrels[idx];
/*
* We're only here for the indexes that parallel workers won't
* process. Note that the shared_istat test ensures that we process
* indexes that fell under initial size cutoff.
*/
if (shared_istat != NULL &&
parallel_processing_is_safe(indrel, lvshared))
/* Skip, indexes that are safe for workers */
if (pindstats->parallel_workers_can_process)
continue;
/* Do vacuum or cleanup of the index */
istat = vacrel->indstats[idx];
vacrel->indstats[idx] = parallel_process_one_index(indrel, istat,
lvshared,
shared_istat,
vacrel);
parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx],
lps->lvshared, pindstats);
}
/*
@ -2904,29 +2905,37 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared)
* statistics returned from ambulkdelete and amvacuumcleanup to the DSM
* segment.
*/
static IndexBulkDeleteResult *
parallel_process_one_index(Relation indrel,
IndexBulkDeleteResult *istat,
LVShared *lvshared,
LVSharedIndStats *shared_istat,
LVRelState *vacrel)
static void
parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel,
LVShared *shared, LVParallelIndStats *pindstats)
{
IndexBulkDeleteResult *istat = NULL;
IndexBulkDeleteResult *istat_res;
/*
* Update the pointer to the corresponding bulk-deletion result if someone
* has already updated it
*/
if (shared_istat && shared_istat->updated && istat == NULL)
istat = &shared_istat->istat;
if (pindstats->istat_updated)
istat = &(pindstats->istat);
/* Do vacuum or cleanup of the index */
if (lvshared->for_cleanup)
istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples,
lvshared->estimated_count, vacrel);
else
istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples,
vacrel);
switch (pindstats->status)
{
case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE:
istat_res = lazy_vacuum_one_index(indrel, istat,
shared->reltuples, vacrel);
break;
case PARALLEL_INDVAC_STATUS_NEED_CLEANUP:
istat_res = lazy_cleanup_one_index(indrel, istat,
shared->reltuples,
shared->estimated_count,
vacrel);
break;
default:
elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"",
pindstats->status,
RelationGetRelationName(indrel));
}
/*
* Copy the index bulk-deletion result returned from ambulkdelete and
@ -2940,19 +2949,20 @@ parallel_process_one_index(Relation indrel,
* Since all vacuum workers write the bulk-deletion result at different
* slots we can write them without locking.
*/
if (shared_istat && !shared_istat->updated && istat_res != NULL)
if (!pindstats->istat_updated && istat_res != NULL)
{
memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult));
shared_istat->updated = true;
memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult));
pindstats->istat_updated = true;
/* Free the locally-allocated bulk-deletion result */
pfree(istat_res);
/* return the pointer to the result from shared memory */
return &shared_istat->istat;
}
return istat_res;
/*
* Update the status to completed. No need to lock here since each worker
* touches different indexes.
*/
pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED;
}
/*
@ -2987,7 +2997,7 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
else
{
/* Outsource everything to parallel variant */
do_parallel_lazy_cleanup_all_indexes(vacrel);
parallel_vacuum_process_all_indexes(vacrel, false);
}
}
@ -3520,7 +3530,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers)
vacrel->relname)));
}
else
begin_parallel_vacuum(vacrel, nworkers);
parallel_vacuum_begin(vacrel, nworkers);
/* If parallel mode started, vacrel->dead_items allocated in DSM */
if (ParallelVacuumIsActive(vacrel))
@ -3552,7 +3562,7 @@ dead_items_cleanup(LVRelState *vacrel)
* End parallel mode before updating index statistics as we cannot write
* during parallel mode.
*/
end_parallel_vacuum(vacrel);
parallel_vacuum_end(vacrel);
}
/*
@ -3758,7 +3768,7 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf,
* vacuum.
*/
static int
compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested,
bool *will_parallel_vacuum)
{
int nindexes_parallel = 0;
@ -3781,6 +3791,7 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested,
Relation indrel = vacrel->indrels[idx];
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
/* Skip index that is not a suitable target for parallel index vacuum */
if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size)
continue;
@ -3855,7 +3866,7 @@ update_index_statistics(LVRelState *vacrel)
* VACUUM is currently active.
*/
static void
begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
parallel_vacuum_begin(LVRelState *vacrel, int nrequested)
{
LVParallelState *lps;
Relation *indrels = vacrel->indrels;
@ -3863,10 +3874,12 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
ParallelContext *pcxt;
LVShared *shared;
LVDeadItems *dead_items;
LVParallelIndStats *pindstats;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
bool *will_parallel_vacuum;
int max_items;
Size est_pindstats_len;
Size est_shared_len;
Size est_dead_items_len;
int nindexes_mwm = 0;
@ -3884,8 +3897,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
* Compute the number of parallel vacuum workers to launch
*/
will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
parallel_workers = compute_parallel_vacuum_workers(vacrel,
nrequested,
parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested,
will_parallel_vacuum);
if (parallel_workers <= 0)
{
@ -3901,48 +3913,21 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
parallel_workers);
Assert(pcxt->nworkers > 0);
lps->pcxt = pcxt;
lps->will_parallel_vacuum = will_parallel_vacuum;
/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */
est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes);
shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
est_shared_len = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
for (int idx = 0; idx < nindexes; idx++)
{
Relation indrel = indrels[idx];
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
/*
* Cleanup option should be either disabled, always performing in
* parallel or conditionally performing in parallel.
*/
Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
/* Skip indexes that don't participate in parallel vacuum */
if (!will_parallel_vacuum[idx])
continue;
if (indrel->rd_indam->amusemaintenanceworkmem)
nindexes_mwm++;
est_shared_len = add_size(est_shared_len, sizeof(LVSharedIndStats));
/*
* Remember the number of indexes that support parallel operation for
* each phase.
*/
if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
lps->nindexes_parallel_bulkdel++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
lps->nindexes_parallel_cleanup++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
lps->nindexes_parallel_condcleanup++;
}
est_shared_len = sizeof(LVShared);
shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */
max_items = dead_items_max_items(vacrel);
est_dead_items_len = MAXALIGN(max_items_to_alloc_size(max_items));
est_dead_items_len = max_items_to_alloc_size(max_items);
shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
@ -3973,6 +3958,41 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
InitializeParallelDSM(pcxt);
/* Prepare index vacuum stats */
pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len);
for (int idx = 0; idx < nindexes; idx++)
{
Relation indrel = indrels[idx];
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
/*
* Cleanup option should be either disabled, always performing in
* parallel or conditionally performing in parallel.
*/
Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
if (!will_parallel_vacuum[idx])
continue;
if (indrel->rd_indam->amusemaintenanceworkmem)
nindexes_mwm++;
/*
* Remember the number of indexes that support parallel operation for
* each phase.
*/
if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
lps->nindexes_parallel_bulkdel++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
lps->nindexes_parallel_cleanup++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
lps->nindexes_parallel_condcleanup++;
}
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats);
lps->lvpindstats = pindstats;
/* Prepare shared information */
shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len);
MemSet(shared, 0, est_shared_len);
@ -3986,21 +4006,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
pg_atomic_init_u32(&(shared->cost_balance), 0);
pg_atomic_init_u32(&(shared->active_nworkers), 0);
pg_atomic_init_u32(&(shared->idx), 0);
shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
/*
* Initialize variables for shared index statistics, set NULL bitmap and
* the size of stats for each index.
*/
memset(shared->bitmap, 0x00, BITMAPLEN(nindexes));
for (int idx = 0; idx < nindexes; idx++)
{
if (!will_parallel_vacuum[idx])
continue;
/* Set NOT NULL as this index does support parallelism */
shared->bitmap[idx >> 3] |= 1 << (idx & 0x07);
}
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
lps->lvshared = shared;
@ -4038,8 +4043,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
}
pfree(will_parallel_vacuum);
/* Success -- set dead_items and lps in leader's vacrel state */
vacrel->dead_items = dead_items;
vacrel->lps = lps;
@ -4055,7 +4058,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested)
* context, but that won't be safe (see ExitParallelMode).
*/
static void
end_parallel_vacuum(LVRelState *vacrel)
parallel_vacuum_end(LVRelState *vacrel)
{
IndexBulkDeleteResult **indstats = vacrel->indstats;
LVParallelState *lps = vacrel->lps;
@ -4066,21 +4069,12 @@ end_parallel_vacuum(LVRelState *vacrel)
/* Copy the updated statistics */
for (int idx = 0; idx < nindexes; idx++)
{
LVSharedIndStats *shared_istat;
LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]);
shared_istat = parallel_stats_for_idx(lps->lvshared, idx);
/*
* Skip index -- it must have been processed by the leader, from
* inside do_serial_processing_for_unsafe_indexes()
*/
if (shared_istat == NULL)
continue;
if (shared_istat->updated)
if (pindstats->istat_updated)
{
indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult));
memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult));
}
else
indstats[idx] = NULL;
@ -4090,72 +4084,43 @@ end_parallel_vacuum(LVRelState *vacrel)
ExitParallelMode();
/* Deactivate parallel vacuum */
pfree(lps->will_parallel_vacuum);
pfree(lps);
vacrel->lps = NULL;
}
/*
* Return shared memory statistics for index at offset 'getidx', if any
*
* Returning NULL indicates that compute_parallel_vacuum_workers() determined
* that the index is a totally unsuitable target for all parallel processing
* up front. For example, the index could be < min_parallel_index_scan_size
* cutoff.
*/
static LVSharedIndStats *
parallel_stats_for_idx(LVShared *lvshared, int getidx)
{
char *p;
if (IndStatsIsNull(lvshared, getidx))
return NULL;
p = (char *) GetSharedIndStats(lvshared);
for (int idx = 0; idx < getidx; idx++)
{
if (IndStatsIsNull(lvshared, idx))
continue;
p += sizeof(LVSharedIndStats);
}
return (LVSharedIndStats *) p;
}
/*
* Returns false, if the given index can't participate in parallel index
* vacuum or parallel index cleanup
* Returns false, if the given index can't participate in the next execution of
* parallel index vacuum or parallel index cleanup.
*/
static bool
parallel_processing_is_safe(Relation indrel, LVShared *lvshared)
parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel,
bool vacuum)
{
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
uint8 vacoptions;
/* first_time must be true only if for_cleanup is true */
Assert(lvshared->for_cleanup || !lvshared->first_time);
vacoptions = indrel->rd_indam->amparallelvacuumoptions;
if (lvshared->for_cleanup)
{
/* Skip, if the index does not support parallel cleanup */
if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
return false;
/* In parallel vacuum case, check if it supports parallel bulk-deletion */
if (vacuum)
return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0);
/*
* Skip, if the index supports parallel cleanup conditionally, but we
* have already processed the index (for bulkdelete). See the
* comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
* when indexes support parallel cleanup conditionally.
*/
if (!lvshared->first_time &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
return false;
}
else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
{
/* Skip if the index does not support parallel bulk deletion */
/* Not safe, if the index does not support parallel cleanup */
if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
return false;
/*
* Not safe, if the index supports parallel cleanup conditionally, but we
* have already processed the index (for bulkdelete). We do this to avoid
* the need to invoke workers when parallel index cleanup doesn't need to
* scan the index. See the comments for option
* VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support
* parallel cleanup conditionally.
*/
if (vacrel->num_index_scans > 0 &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
return false;
}
return true;
}
@ -4171,6 +4136,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
{
Relation rel;
Relation *indrels;
LVParallelIndStats *lvpindstats;
LVShared *lvshared;
LVDeadItems *dead_items;
BufferUsage *buffer_usage;
@ -4190,10 +4156,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
false);
elevel = lvshared->elevel;
if (lvshared->for_cleanup)
elog(DEBUG1, "starting parallel vacuum worker for cleanup");
else
elog(DEBUG1, "starting parallel vacuum worker for bulk delete");
elog(DEBUG1, "starting parallel vacuum worker");
/* Set debug_query_string for individual workers */
sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true);
@ -4214,6 +4177,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels);
Assert(nindexes > 0);
/* Set index statistics */
lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc,
PARALLEL_VACUUM_KEY_INDEX_STATS,
false);
/* Set dead_items space (set as worker's vacrel dead_items below) */
dead_items = (LVDeadItems *) shm_toc_lookup(toc,
PARALLEL_VACUUM_KEY_DEAD_ITEMS,
@ -4259,7 +4227,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
InstrStartParallelQuery();
/* Process indexes to perform vacuum/cleanup */
do_parallel_processing(&vacrel, lvshared);
parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats);
/* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);

View File

@ -1307,6 +1307,8 @@ LSEG
LUID
LVDeadTuples
LVPagePruneState
LVParallelIndStats
LVParallelIndVacStatus
LVParallelState
LVRelState
LVSavedErrInfo