diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 282b44f87b..db6becfed5 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -130,6 +130,7 @@ #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 #define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 /* * Macro to check if we are in a parallel vacuum. If true, we are in the @@ -181,14 +182,6 @@ typedef struct LVShared Oid relid; int elevel; - /* - * An indication for vacuum workers to perform either index vacuum or - * index cleanup. first_time is true only if for_cleanup is true and - * bulk-deletion is not performed yet. - */ - bool for_cleanup; - bool first_time; - /* * Fields for both index vacuum and cleanup. * @@ -226,33 +219,44 @@ typedef struct LVShared */ pg_atomic_uint32 active_nworkers; - /* - * Variables to control parallel vacuum. We have a bitmap to indicate - * which index has stats in shared memory. The set bit in the map - * indicates that the particular index supports a parallel vacuum. - */ - pg_atomic_uint32 idx; /* counter for vacuuming and clean up */ - uint32 offset; /* sizeof header incl. bitmap */ - bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */ - - /* Shared index statistics data follows at end of struct */ + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; } LVShared; -#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8)) -#define GetSharedIndStats(s) \ - ((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset)) -#define IndStatsIsNull(s, i) \ - (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07)))) +/* Status used during parallel index vacuum or cleanup */ +typedef enum LVParallelIndVacStatus +{ + PARALLEL_INDVAC_STATUS_INITIAL = 0, + PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, + PARALLEL_INDVAC_STATUS_NEED_CLEANUP, + PARALLEL_INDVAC_STATUS_COMPLETED +} LVParallelIndVacStatus; /* - * Struct for an index bulk-deletion statistic used for parallel vacuum. This - * is allocated in the DSM segment. + * Struct for index vacuum statistics of an index that is used for parallel vacuum. + * This includes the status of parallel index vacuum as well as index statistics. */ -typedef struct LVSharedIndStats +typedef struct LVParallelIndStats { - bool updated; /* are the stats updated? */ + /* + * The following two fields are set by leader process before executing + * parallel index vacuum or parallel index cleanup. These fields are not + * fixed for the entire VACUUM operation. They are only fixed for an + * individual parallel index vacuum and cleanup. + * + * parallel_workers_can_process is true if both leader and worker can + * process the index, otherwise only leader can process it. + */ + LVParallelIndVacStatus status; + bool parallel_workers_can_process; + + /* + * Individual worker or leader stores the result of index vacuum or + * cleanup. + */ + bool istat_updated; /* are the stats updated? */ IndexBulkDeleteResult istat; -} LVSharedIndStats; +} LVParallelIndStats; /* Struct for maintaining a parallel vacuum state. */ typedef struct LVParallelState @@ -262,12 +266,29 @@ typedef struct LVParallelState /* Shared information among parallel vacuum workers */ LVShared *lvshared; + /* + * Shared index statistics among parallel vacuum workers. The array + * element is allocated for every index, even those indexes where parallel + * index vacuuming is unsafe or not worthwhile (e.g., + * will_parallel_vacuum[] is false). During parallel vacuum, + * IndexBulkDeleteResult of each index is kept in DSM and is copied into + * local memory at the end of parallel vacuum. + */ + LVParallelIndStats *lvpindstats; + /* Points to buffer usage area in DSM */ BufferUsage *buffer_usage; /* Points to WAL usage area in DSM */ WalUsage *wal_usage; + /* + * False if the index is totally unsuitable target for all parallel + * processing. For example, the index could be < + * min_parallel_index_scan_size cutoff. + */ + bool *will_parallel_vacuum; + /* * The number of indexes that support parallel index bulk-deletion and * parallel index cleanup respectively. @@ -391,19 +412,14 @@ static int lazy_vacuum_heap_page(LVRelState *vacrel, BlockNumber blkno, static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); -static void do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel); -static void do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel); -static void do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers); -static void do_parallel_processing(LVRelState *vacrel, - LVShared *lvshared); -static void do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, - LVShared *lvshared); -static IndexBulkDeleteResult *parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_indstats, - LVRelState *vacrel); static void lazy_cleanup_all_indexes(LVRelState *vacrel); +static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum); +static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, + LVParallelIndStats *pindstats); +static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel); +static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, + LVShared *shared, + LVParallelIndStats *pindstats); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, double reltuples, @@ -425,14 +441,13 @@ static bool lazy_tid_reaped(ItemPointer itemptr, void *state); static int vac_cmp_itemptr(const void *left, const void *right); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int compute_parallel_vacuum_workers(LVRelState *vacrel, - int nrequested, +static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, bool *will_parallel_vacuum); static void update_index_statistics(LVRelState *vacrel); -static void begin_parallel_vacuum(LVRelState *vacrel, int nrequested); -static void end_parallel_vacuum(LVRelState *vacrel); -static LVSharedIndStats *parallel_stats_for_idx(LVShared *lvshared, int getidx); -static bool parallel_processing_is_safe(Relation indrel, LVShared *lvshared); +static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested); +static void parallel_vacuum_end(LVRelState *vacrel); +static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, + bool vacuum); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -2237,7 +2252,7 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_vacuum_all_indexes(vacrel); + parallel_vacuum_process_all_indexes(vacrel, true); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2610,99 +2625,92 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) return false; } -/* - * Perform lazy_vacuum_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_vacuum_all_indexes(LVRelState *vacrel) -{ - /* Tell parallel workers to do index vacuuming */ - vacrel->lps->lvshared->for_cleanup = false; - vacrel->lps->lvshared->first_time = false; - - /* - * We can only provide an approximate value of num_heap_tuples, at least - * for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - do_parallel_vacuum_or_cleanup(vacrel, - vacrel->lps->nindexes_parallel_bulkdel); -} - -/* - * Perform lazy_cleanup_all_indexes() steps in parallel - */ -static void -do_parallel_lazy_cleanup_all_indexes(LVRelState *vacrel) -{ - int nworkers; - - /* - * If parallel vacuum is active we perform index cleanup with parallel - * workers. - * - * Tell parallel workers to do index cleanup. - */ - vacrel->lps->lvshared->for_cleanup = true; - vacrel->lps->lvshared->first_time = (vacrel->num_index_scans == 0); - - /* - * Now we can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - /* Determine the number of parallel workers to launch */ - if (vacrel->lps->lvshared->first_time) - nworkers = vacrel->lps->nindexes_parallel_cleanup + - vacrel->lps->nindexes_parallel_condcleanup; - else - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - do_parallel_vacuum_or_cleanup(vacrel, nworkers); -} - /* * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. The caller must set - * lps->lvshared->for_cleanup to indicate whether to perform vacuum or - * cleanup. + * must be used by the parallel vacuum leader process. */ static void -do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) +parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum) { LVParallelState *lps = vacrel->lps; + LVParallelIndVacStatus new_status; + int nworkers; Assert(!IsParallelWorker()); Assert(ParallelVacuumIsActive(vacrel)); Assert(vacrel->nindexes > 0); + if (vacuum) + { + /* + * We can only provide an approximate value of num_heap_tuples, at + * least for now. Matches serial VACUUM case. + */ + vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; + vacrel->lps->lvshared->estimated_count = true; + + new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; + + /* Determine the number of parallel workers to launch */ + nworkers = vacrel->lps->nindexes_parallel_bulkdel; + } + else + { + /* + * We can provide a better estimate of total number of surviving + * tuples (we assume indexes are more interested in that than in the + * number of nominally live tuples). + */ + vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; + vacrel->lps->lvshared->estimated_count = + (vacrel->tupcount_pages < vacrel->rel_pages); + + new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; + + /* Determine the number of parallel workers to launch */ + nworkers = vacrel->lps->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (vacrel->num_index_scans == 0) + nworkers += vacrel->lps->nindexes_parallel_condcleanup; + } + /* The leader process will participate */ nworkers--; /* * It is possible that parallel context is initialized with fewer workers * than the number of indexes that need a separate worker in the current - * phase, so we need to consider it. See compute_parallel_vacuum_workers. + * phase, so we need to consider it. See + * parallel_vacuum_compute_workers(). */ nworkers = Min(nworkers, lps->pcxt->nworkers); + /* + * Set index vacuum status and mark whether parallel vacuum worker can + * process it. + */ + for (int i = 0; i < vacrel->nindexes; i++) + { + LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]); + + Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL); + pindstats->status = new_status; + pindstats->parallel_workers_can_process = + (lps->will_parallel_vacuum[i] & + parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i], + vacuum)); + } + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(lps->lvshared->idx), 0); + /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) { + /* Reinitialize parallel context to relaunch parallel workers */ if (vacrel->num_index_scans > 0) - { - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Reinitialize the parallel context to relaunch parallel workers */ ReinitializeParallelDSM(lps->pcxt); - } /* * Set up shared cost balance and the number of active workers for @@ -2735,28 +2743,28 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); } - if (lps->lvshared->for_cleanup) - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - else + if (vacuum) ereport(elevel, (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", "launched %d parallel vacuum workers for index vacuuming (planned: %d)", lps->pcxt->nworkers_launched), lps->pcxt->nworkers_launched, nworkers))); + else + ereport(elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", + lps->pcxt->nworkers_launched), + lps->pcxt->nworkers_launched, nworkers))); } /* Process the indexes that can be processed by only leader process */ - do_serial_processing_for_unsafe_indexes(vacrel, lps->lvshared); + parallel_vacuum_process_unsafe_indexes(vacrel); /* - * Join as a parallel worker. The leader process alone processes all the - * indexes in the case where no workers are launched. + * Join as a parallel worker. The leader process alone processes all + * parallel-safe indexes in the case where no workers are launched. */ - do_parallel_processing(vacrel, lps->lvshared); + parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats); /* * Next, accumulate buffer and WAL usage. (This must wait for the workers @@ -2771,6 +2779,21 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); } + /* + * Reset all index status back to initial (while checking that we have + * processed all indexes). + */ + for (int i = 0; i < vacrel->nindexes; i++) + { + LVParallelIndStats *pindstats = &(lps->lvpindstats[i]); + + if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) + elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", + RelationGetRelationName(vacrel->indrels[i])); + + pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL; + } + /* * Carry the shared balance value to heap scan and disable shared costing */ @@ -2787,7 +2810,8 @@ do_parallel_vacuum_or_cleanup(LVRelState *vacrel, int nworkers) * vacuum worker processes to process the indexes in parallel. */ static void -do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) +parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, + LVParallelIndStats *pindstats) { /* * Increment the active worker count if we are able to launch any worker. @@ -2799,39 +2823,28 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) for (;;) { int idx; - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; + LVParallelIndStats *pis; /* Get an index number to process */ - idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1); + idx = pg_atomic_fetch_add_u32(&(shared->idx), 1); /* Done for all indexes? */ if (idx >= vacrel->nindexes) break; - /* Get the index statistics space from DSM, if any */ - shared_istat = parallel_stats_for_idx(lvshared, idx); - - /* Skip indexes not participating in parallelism */ - if (shared_istat == NULL) - continue; - - indrel = vacrel->indrels[idx]; + pis = &(pindstats[idx]); /* - * Skip processing indexes that are unsafe for workers (these are - * processed in do_serial_processing_for_unsafe_indexes() by leader) + * Skip processing index that is unsafe for workers or has an + * unsuitable target for parallel index vacuum (this is processed in + * parallel_vacuum_process_unsafe_indexes() by the leader). */ - if (!parallel_processing_is_safe(indrel, lvshared)) + if (!pis->parallel_workers_can_process) continue; /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); + parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], + shared, pis); } /* @@ -2847,15 +2860,16 @@ do_parallel_processing(LVRelState *vacrel, LVShared *lvshared) * * Handles index vacuuming (or index cleanup) for indexes that are not * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing for_cleanup processing right now. + * on details like whether we're performing index cleanup right now. * * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by compute_parallel_vacuum_workers(). These indexes never get a - * slot for statistics in DSM. + * enforced by parallel_vacuum_compute_workers(). */ static void -do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) +parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel) { + LVParallelState *lps = vacrel->lps; + Assert(!IsParallelWorker()); /* @@ -2866,28 +2880,15 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) for (int idx = 0; idx < vacrel->nindexes; idx++) { - LVSharedIndStats *shared_istat; - Relation indrel; - IndexBulkDeleteResult *istat; + LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - shared_istat = parallel_stats_for_idx(lvshared, idx); - indrel = vacrel->indrels[idx]; - - /* - * We're only here for the indexes that parallel workers won't - * process. Note that the shared_istat test ensures that we process - * indexes that fell under initial size cutoff. - */ - if (shared_istat != NULL && - parallel_processing_is_safe(indrel, lvshared)) + /* Skip, indexes that are safe for workers */ + if (pindstats->parallel_workers_can_process) continue; /* Do vacuum or cleanup of the index */ - istat = vacrel->indstats[idx]; - vacrel->indstats[idx] = parallel_process_one_index(indrel, istat, - lvshared, - shared_istat, - vacrel); + parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], + lps->lvshared, pindstats); } /* @@ -2904,29 +2905,37 @@ do_serial_processing_for_unsafe_indexes(LVRelState *vacrel, LVShared *lvshared) * statistics returned from ambulkdelete and amvacuumcleanup to the DSM * segment. */ -static IndexBulkDeleteResult * -parallel_process_one_index(Relation indrel, - IndexBulkDeleteResult *istat, - LVShared *lvshared, - LVSharedIndStats *shared_istat, - LVRelState *vacrel) +static void +parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, + LVShared *shared, LVParallelIndStats *pindstats) { + IndexBulkDeleteResult *istat = NULL; IndexBulkDeleteResult *istat_res; /* * Update the pointer to the corresponding bulk-deletion result if someone * has already updated it */ - if (shared_istat && shared_istat->updated && istat == NULL) - istat = &shared_istat->istat; + if (pindstats->istat_updated) + istat = &(pindstats->istat); - /* Do vacuum or cleanup of the index */ - if (lvshared->for_cleanup) - istat_res = lazy_cleanup_one_index(indrel, istat, lvshared->reltuples, - lvshared->estimated_count, vacrel); - else - istat_res = lazy_vacuum_one_index(indrel, istat, lvshared->reltuples, - vacrel); + switch (pindstats->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + istat_res = lazy_vacuum_one_index(indrel, istat, + shared->reltuples, vacrel); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + istat_res = lazy_cleanup_one_index(indrel, istat, + shared->reltuples, + shared->estimated_count, + vacrel); + break; + default: + elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", + pindstats->status, + RelationGetRelationName(indrel)); + } /* * Copy the index bulk-deletion result returned from ambulkdelete and @@ -2940,19 +2949,20 @@ parallel_process_one_index(Relation indrel, * Since all vacuum workers write the bulk-deletion result at different * slots we can write them without locking. */ - if (shared_istat && !shared_istat->updated && istat_res != NULL) + if (!pindstats->istat_updated && istat_res != NULL) { - memcpy(&shared_istat->istat, istat_res, sizeof(IndexBulkDeleteResult)); - shared_istat->updated = true; + memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + pindstats->istat_updated = true; /* Free the locally-allocated bulk-deletion result */ pfree(istat_res); - - /* return the pointer to the result from shared memory */ - return &shared_istat->istat; } - return istat_res; + /* + * Update the status to completed. No need to lock here since each worker + * touches different indexes. + */ + pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; } /* @@ -2987,7 +2997,7 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - do_parallel_lazy_cleanup_all_indexes(vacrel); + parallel_vacuum_process_all_indexes(vacrel, false); } } @@ -3520,7 +3530,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else - begin_parallel_vacuum(vacrel, nworkers); + parallel_vacuum_begin(vacrel, nworkers); /* If parallel mode started, vacrel->dead_items allocated in DSM */ if (ParallelVacuumIsActive(vacrel)) @@ -3552,7 +3562,7 @@ dead_items_cleanup(LVRelState *vacrel) * End parallel mode before updating index statistics as we cannot write * during parallel mode. */ - end_parallel_vacuum(vacrel); + parallel_vacuum_end(vacrel); } /* @@ -3758,7 +3768,7 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, * vacuum. */ static int -compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested, +parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, bool *will_parallel_vacuum) { int nindexes_parallel = 0; @@ -3781,6 +3791,7 @@ compute_parallel_vacuum_workers(LVRelState *vacrel, int nrequested, Relation indrel = vacrel->indrels[idx]; uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + /* Skip index that is not a suitable target for parallel index vacuum */ if (vacoptions == VACUUM_OPTION_NO_PARALLEL || RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) continue; @@ -3855,7 +3866,7 @@ update_index_statistics(LVRelState *vacrel) * VACUUM is currently active. */ static void -begin_parallel_vacuum(LVRelState *vacrel, int nrequested) +parallel_vacuum_begin(LVRelState *vacrel, int nrequested) { LVParallelState *lps; Relation *indrels = vacrel->indrels; @@ -3863,10 +3874,12 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) ParallelContext *pcxt; LVShared *shared; LVDeadItems *dead_items; + LVParallelIndStats *pindstats; BufferUsage *buffer_usage; WalUsage *wal_usage; bool *will_parallel_vacuum; int max_items; + Size est_pindstats_len; Size est_shared_len; Size est_dead_items_len; int nindexes_mwm = 0; @@ -3884,8 +3897,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) * Compute the number of parallel vacuum workers to launch */ will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = compute_parallel_vacuum_workers(vacrel, - nrequested, + parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested, will_parallel_vacuum); if (parallel_workers <= 0) { @@ -3901,48 +3913,21 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) parallel_workers); Assert(pcxt->nworkers > 0); lps->pcxt = pcxt; + lps->will_parallel_vacuum = will_parallel_vacuum; + + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ + est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared_len = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - /* Skip indexes that don't participate in parallel vacuum */ - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - est_shared_len = add_size(est_shared_len, sizeof(LVSharedIndStats)); - - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } + est_shared_len = sizeof(LVShared); shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); shm_toc_estimate_keys(&pcxt->estimator, 1); /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ max_items = dead_items_max_items(vacrel); - est_dead_items_len = MAXALIGN(max_items_to_alloc_size(max_items)); + est_dead_items_len = max_items_to_alloc_size(max_items); shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); shm_toc_estimate_keys(&pcxt->estimator, 1); @@ -3973,6 +3958,41 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) InitializeParallelDSM(pcxt); + /* Prepare index vacuum stats */ + pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len); + for (int idx = 0; idx < nindexes; idx++) + { + Relation indrel = indrels[idx]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + if (!will_parallel_vacuum[idx]) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + lps->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + lps->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + lps->nindexes_parallel_condcleanup++; + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats); + lps->lvpindstats = pindstats; + /* Prepare shared information */ shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); MemSet(shared, 0, est_shared_len); @@ -3986,21 +4006,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); - shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes))); - - /* - * Initialize variables for shared index statistics, set NULL bitmap and - * the size of stats for each index. - */ - memset(shared->bitmap, 0x00, BITMAPLEN(nindexes)); - for (int idx = 0; idx < nindexes; idx++) - { - if (!will_parallel_vacuum[idx]) - continue; - - /* Set NOT NULL as this index does support parallelism */ - shared->bitmap[idx >> 3] |= 1 << (idx & 0x07); - } shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); lps->lvshared = shared; @@ -4038,8 +4043,6 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); } - pfree(will_parallel_vacuum); - /* Success -- set dead_items and lps in leader's vacrel state */ vacrel->dead_items = dead_items; vacrel->lps = lps; @@ -4055,7 +4058,7 @@ begin_parallel_vacuum(LVRelState *vacrel, int nrequested) * context, but that won't be safe (see ExitParallelMode). */ static void -end_parallel_vacuum(LVRelState *vacrel) +parallel_vacuum_end(LVRelState *vacrel) { IndexBulkDeleteResult **indstats = vacrel->indstats; LVParallelState *lps = vacrel->lps; @@ -4066,21 +4069,12 @@ end_parallel_vacuum(LVRelState *vacrel) /* Copy the updated statistics */ for (int idx = 0; idx < nindexes; idx++) { - LVSharedIndStats *shared_istat; + LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - shared_istat = parallel_stats_for_idx(lps->lvshared, idx); - - /* - * Skip index -- it must have been processed by the leader, from - * inside do_serial_processing_for_unsafe_indexes() - */ - if (shared_istat == NULL) - continue; - - if (shared_istat->updated) + if (pindstats->istat_updated) { indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &shared_istat->istat, sizeof(IndexBulkDeleteResult)); + memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult)); } else indstats[idx] = NULL; @@ -4090,72 +4084,43 @@ end_parallel_vacuum(LVRelState *vacrel) ExitParallelMode(); /* Deactivate parallel vacuum */ + pfree(lps->will_parallel_vacuum); pfree(lps); vacrel->lps = NULL; } /* - * Return shared memory statistics for index at offset 'getidx', if any - * - * Returning NULL indicates that compute_parallel_vacuum_workers() determined - * that the index is a totally unsuitable target for all parallel processing - * up front. For example, the index could be < min_parallel_index_scan_size - * cutoff. - */ -static LVSharedIndStats * -parallel_stats_for_idx(LVShared *lvshared, int getidx) -{ - char *p; - - if (IndStatsIsNull(lvshared, getidx)) - return NULL; - - p = (char *) GetSharedIndStats(lvshared); - for (int idx = 0; idx < getidx; idx++) - { - if (IndStatsIsNull(lvshared, idx)) - continue; - - p += sizeof(LVSharedIndStats); - } - - return (LVSharedIndStats *) p; -} - -/* - * Returns false, if the given index can't participate in parallel index - * vacuum or parallel index cleanup + * Returns false, if the given index can't participate in the next execution of + * parallel index vacuum or parallel index cleanup. */ static bool -parallel_processing_is_safe(Relation indrel, LVShared *lvshared) +parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, + bool vacuum) { - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + uint8 vacoptions; - /* first_time must be true only if for_cleanup is true */ - Assert(lvshared->for_cleanup || !lvshared->first_time); + vacoptions = indrel->rd_indam->amparallelvacuumoptions; - if (lvshared->for_cleanup) - { - /* Skip, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return false; + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (vacuum) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); - /* - * Skip, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). See the - * comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know - * when indexes support parallel cleanup conditionally. - */ - if (!lvshared->first_time && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - } - else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0) - { - /* Skip if the index does not support parallel bulk deletion */ + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; + + /* + * Not safe, if the index supports parallel cleanup conditionally, but we + * have already processed the index (for bulkdelete). We do this to avoid + * the need to invoke workers when parallel index cleanup doesn't need to + * scan the index. See the comments for option + * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support + * parallel cleanup conditionally. + */ + if (vacrel->num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) return false; - } return true; } @@ -4171,6 +4136,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) { Relation rel; Relation *indrels; + LVParallelIndStats *lvpindstats; LVShared *lvshared; LVDeadItems *dead_items; BufferUsage *buffer_usage; @@ -4190,10 +4156,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) false); elevel = lvshared->elevel; - if (lvshared->for_cleanup) - elog(DEBUG1, "starting parallel vacuum worker for cleanup"); - else - elog(DEBUG1, "starting parallel vacuum worker for bulk delete"); + elog(DEBUG1, "starting parallel vacuum worker"); /* Set debug_query_string for individual workers */ sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); @@ -4214,6 +4177,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); Assert(nindexes > 0); + /* Set index statistics */ + lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + /* Set dead_items space (set as worker's vacrel dead_items below) */ dead_items = (LVDeadItems *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, @@ -4259,7 +4227,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Process indexes to perform vacuum/cleanup */ - do_parallel_processing(&vacrel, lvshared); + parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats); /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f41ef0d2bc..0c61ccbdd0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1307,6 +1307,8 @@ LSEG LUID LVDeadTuples LVPagePruneState +LVParallelIndStats +LVParallelIndVacStatus LVParallelState LVRelState LVSavedErrInfo