diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index d8f1217504..cd603e6aa4 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -40,7 +40,6 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" -#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" #include "access/xact.h" @@ -120,23 +119,11 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) -/* - * DSM keys for parallel vacuum. Unlike other parallel execution code, since - * we don't need to worry about DSM keys conflicting with plan_node_id we can - * use small integers. - */ -#define PARALLEL_VACUUM_KEY_SHARED 1 -#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 -#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 - /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ -#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL) +#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -149,135 +136,6 @@ typedef enum VACUUM_ERRCB_PHASE_TRUNCATE } VacErrPhase; -/* - * Shared information among parallel workers. So this is allocated in the DSM - * segment. - */ -typedef struct LVShared -{ - /* - * Target table relid and log level. These fields are not modified during - * the lazy vacuum. - */ - Oid relid; - int elevel; - - /* - * Fields for both index vacuum and cleanup. - * - * reltuples is the total number of input heap tuples. We set either old - * live tuples in the index vacuum case or the new live tuples in the - * index cleanup case. - * - * estimated_count is true if reltuples is an estimated value. (Note that - * reltuples could be -1 in this case, indicating we have no idea.) - */ - double reltuples; - bool estimated_count; - - /* - * In single process lazy vacuum we could consume more memory during index - * vacuuming or cleanup apart from the memory for heap scanning. In - * parallel vacuum, since individual vacuum workers can consume memory - * equal to maintenance_work_mem, the new maintenance_work_mem for each - * worker is set such that the parallel operation doesn't consume more - * memory than single process lazy vacuum. - */ - int maintenance_work_mem_worker; - - /* - * Shared vacuum cost balance. During parallel vacuum, - * VacuumSharedCostBalance points to this value and it accumulates the - * balance of each parallel vacuum worker. - */ - pg_atomic_uint32 cost_balance; - - /* - * Number of active parallel workers. This is used for computing the - * minimum threshold of the vacuum cost balance before a worker sleeps for - * cost-based delay. - */ - pg_atomic_uint32 active_nworkers; - - /* Counter for vacuuming and cleanup */ - pg_atomic_uint32 idx; -} LVShared; - -/* Status used during parallel index vacuum or cleanup */ -typedef enum LVParallelIndVacStatus -{ - PARALLEL_INDVAC_STATUS_INITIAL = 0, - PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, - PARALLEL_INDVAC_STATUS_NEED_CLEANUP, - PARALLEL_INDVAC_STATUS_COMPLETED -} LVParallelIndVacStatus; - -/* - * Struct for index vacuum statistics of an index that is used for parallel vacuum. - * This includes the status of parallel index vacuum as well as index statistics. - */ -typedef struct LVParallelIndStats -{ - /* - * The following two fields are set by leader process before executing - * parallel index vacuum or parallel index cleanup. These fields are not - * fixed for the entire VACUUM operation. They are only fixed for an - * individual parallel index vacuum and cleanup. - * - * parallel_workers_can_process is true if both leader and worker can - * process the index, otherwise only leader can process it. - */ - LVParallelIndVacStatus status; - bool parallel_workers_can_process; - - /* - * Individual worker or leader stores the result of index vacuum or - * cleanup. - */ - bool istat_updated; /* are the stats updated? */ - IndexBulkDeleteResult istat; -} LVParallelIndStats; - -/* Struct for maintaining a parallel vacuum state. */ -typedef struct LVParallelState -{ - ParallelContext *pcxt; - - /* Shared information among parallel vacuum workers */ - LVShared *lvshared; - - /* - * Shared index statistics among parallel vacuum workers. The array - * element is allocated for every index, even those indexes where parallel - * index vacuuming is unsafe or not worthwhile (e.g., - * will_parallel_vacuum[] is false). During parallel vacuum, - * IndexBulkDeleteResult of each index is kept in DSM and is copied into - * local memory at the end of parallel vacuum. - */ - LVParallelIndStats *lvpindstats; - - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; - - /* - * False if the index is totally unsuitable target for all parallel - * processing. For example, the index could be < - * min_parallel_index_scan_size cutoff. - */ - bool *will_parallel_vacuum; - - /* - * The number of indexes that support parallel index bulk-deletion and - * parallel index cleanup respectively. - */ - int nindexes_parallel_bulkdel; - int nindexes_parallel_cleanup; - int nindexes_parallel_condcleanup; -} LVParallelState; - typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -295,9 +153,9 @@ typedef struct LVRelState bool do_index_cleanup; bool do_rel_truncate; - /* Buffer access strategy and parallel state */ + /* Buffer access strategy and parallel vacuum state */ BufferAccessStrategy bstrategy; - LVParallelState *lps; + ParallelVacuumState *pvs; /* rel's initial relfrozenxid and relminmxid */ TransactionId relfrozenxid; @@ -399,13 +257,6 @@ static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); static void lazy_cleanup_all_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum); -static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats); -static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, - LVParallelIndStats *pindstats); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, double reltuples, @@ -419,18 +270,11 @@ static bool should_attempt_truncation(LVRelState *vacrel); static void lazy_truncate_heap(LVRelState *vacrel); static BlockNumber count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected); -static int dead_items_max_items(LVRelState *vacrel); static void dead_items_alloc(LVRelState *vacrel, int nworkers); static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum); static void update_index_statistics(LVRelState *vacrel); -static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested); -static void parallel_vacuum_end(LVRelState *vacrel); -static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -1601,7 +1445,8 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive) /* * Free resources managed by dead_items_alloc. This will end parallel - * mode when needed (it must end before we update index statistics). + * mode when needed (it must end before updating index statistics as we + * can't write in parallel mode). */ dead_items_cleanup(vacrel); @@ -2066,7 +1911,6 @@ lazy_vacuum(LVRelState *vacrel) /* Should not end up here with no indexes */ Assert(vacrel->nindexes > 0); - Assert(!IsParallelWorker()); Assert(vacrel->lpdead_item_pages > 0); if (!vacrel->do_index_vacuuming) @@ -2195,7 +2039,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) { bool allindexes = true; - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); Assert(vacrel->do_index_vacuuming); Assert(vacrel->do_index_cleanup); @@ -2235,7 +2078,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, true); + parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, + vacrel->num_index_scans); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2608,353 +2452,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) return false; } -/* - * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. - */ -static void -parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum) -{ - LVParallelState *lps = vacrel->lps; - LVParallelIndVacStatus new_status; - int nworkers; - - Assert(!IsParallelWorker()); - Assert(ParallelVacuumIsActive(vacrel)); - Assert(vacrel->nindexes > 0); - - if (vacuum) - { - /* - * We can only provide an approximate value of num_heap_tuples, at - * least for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_bulkdel; - } - else - { - /* - * We can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - /* Add conditionally parallel-aware indexes if in the first time call */ - if (vacrel->num_index_scans == 0) - nworkers += vacrel->lps->nindexes_parallel_condcleanup; - } - - /* The leader process will participate */ - nworkers--; - - /* - * It is possible that parallel context is initialized with fewer workers - * than the number of indexes that need a separate worker in the current - * phase, so we need to consider it. See - * parallel_vacuum_compute_workers(). - */ - nworkers = Min(nworkers, lps->pcxt->nworkers); - - /* - * Set index vacuum status and mark whether parallel vacuum worker can - * process it. - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]); - - Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL); - pindstats->status = new_status; - pindstats->parallel_workers_can_process = - (lps->will_parallel_vacuum[i] & - parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i], - vacuum)); - } - - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Setup the shared cost-based vacuum delay and launch workers */ - if (nworkers > 0) - { - /* Reinitialize parallel context to relaunch parallel workers */ - if (vacrel->num_index_scans > 0) - ReinitializeParallelDSM(lps->pcxt); - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(lps->pcxt, nworkers); - - LaunchParallelWorkers(lps->pcxt); - - if (lps->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(lps->lvshared->cost_balance); - VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); - } - - if (vacuum) - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", - "launched %d parallel vacuum workers for index vacuuming (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - else - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - } - - /* Process the indexes that can be processed by only leader process */ - parallel_vacuum_process_unsafe_indexes(vacrel); - - /* - * Join as a parallel worker. The leader process alone processes all - * parallel-safe indexes in the case where no workers are launched. - */ - parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats); - - /* - * Next, accumulate buffer and WAL usage. (This must wait for the workers - * to finish, or we might get incomplete data.) - */ - if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(lps->pcxt); - - for (int i = 0; i < lps->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); - } - - /* - * Reset all index status back to initial (while checking that we have - * processed all indexes). - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[i]); - - if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) - elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", - RelationGetRelationName(vacrel->indrels[i])); - - pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL; - } - - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } -} - -/* - * Index vacuum/cleanup routine used by the leader process and parallel - * vacuum worker processes to process the indexes in parallel. - */ -static void -parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats) -{ - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - /* Loop until all indexes are vacuumed */ - for (;;) - { - int idx; - LVParallelIndStats *pis; - - /* Get an index number to process */ - idx = pg_atomic_fetch_add_u32(&(shared->idx), 1); - - /* Done for all indexes? */ - if (idx >= vacrel->nindexes) - break; - - pis = &(pindstats[idx]); - - /* - * Skip processing index that is unsafe for workers or has an - * unsuitable target for parallel index vacuum (this is processed in - * parallel_vacuum_process_unsafe_indexes() by the leader). - */ - if (!pis->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - shared, pis); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Perform parallel processing of indexes in leader process. - * - * Handles index vacuuming (or index cleanup) for indexes that are not - * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing index cleanup right now. - * - * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by parallel_vacuum_compute_workers(). - */ -static void -parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel) -{ - LVParallelState *lps = vacrel->lps; - - Assert(!IsParallelWorker()); - - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - /* Skip, indexes that are safe for workers */ - if (pindstats->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - lps->lvshared, pindstats); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Vacuum or cleanup index either by leader process or by one of the worker - * process. After processing the index this function copies the index - * statistics returned from ambulkdelete and amvacuumcleanup to the DSM - * segment. - */ -static void -parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, LVParallelIndStats *pindstats) -{ - IndexBulkDeleteResult *istat = NULL; - IndexBulkDeleteResult *istat_res; - - /* - * Update the pointer to the corresponding bulk-deletion result if someone - * has already updated it - */ - if (pindstats->istat_updated) - istat = &(pindstats->istat); - - switch (pindstats->status) - { - case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: - istat_res = lazy_vacuum_one_index(indrel, istat, - shared->reltuples, vacrel); - break; - case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: - istat_res = lazy_cleanup_one_index(indrel, istat, - shared->reltuples, - shared->estimated_count, - vacrel); - break; - default: - elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", - pindstats->status, - RelationGetRelationName(indrel)); - } - - /* - * Copy the index bulk-deletion result returned from ambulkdelete and - * amvacuumcleanup to the DSM segment if it's the first cycle because they - * allocate locally and it's possible that an index will be vacuumed by a - * different vacuum process the next cycle. Copying the result normally - * happens only the first time an index is vacuumed. For any additional - * vacuum pass, we directly point to the result on the DSM segment and - * pass it to vacuum index APIs so that workers can update it directly. - * - * Since all vacuum workers write the bulk-deletion result at different - * slots we can write them without locking. - */ - if (!pindstats->istat_updated && istat_res != NULL) - { - memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); - pindstats->istat_updated = true; - - /* Free the locally-allocated bulk-deletion result */ - pfree(istat_res); - } - - /* - * Update the status to completed. No need to lock here since each worker - * touches different indexes. - */ - pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; -} - /* * lazy_cleanup_all_indexes() -- cleanup all indexes of relation. */ static void lazy_cleanup_all_indexes(LVRelState *vacrel) { - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); /* Report that we are now cleaning up indexes */ @@ -2980,7 +2483,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, false); + parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, + vacrel->num_index_scans, + (vacrel->tupcount_pages < vacrel->rel_pages)); } } @@ -3409,8 +2914,6 @@ dead_items_max_items(LVRelState *vacrel) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - Assert(!IsParallelWorker()); - if (vacrel->nindexes > 0) { BlockNumber rel_pages = vacrel->rel_pages; @@ -3448,6 +2951,9 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) VacDeadItems *dead_items; int max_items; + max_items = dead_items_max_items(vacrel); + Assert(max_items >= MaxHeapTuplesPerPage); + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3471,15 +2977,20 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else - parallel_vacuum_begin(vacrel, nworkers); + vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, + vacrel->nindexes, nworkers, + max_items, elevel, + vacrel->bstrategy); - /* If parallel mode started, vacrel->dead_items allocated in DSM */ + /* If parallel mode started, dead_items space is allocated in DSM */ if (ParallelVacuumIsActive(vacrel)) + { + vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs); return; + } } /* Serial VACUUM case */ - max_items = dead_items_max_items(vacrel); dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items)); dead_items->max_items = max_items; dead_items->num_items = 0; @@ -3499,11 +3010,9 @@ dead_items_cleanup(LVRelState *vacrel) return; } - /* - * End parallel mode before updating index statistics as we cannot write - * during parallel mode. - */ - parallel_vacuum_end(vacrel); + /* End parallel mode */ + parallel_vacuum_end(vacrel->pvs, vacrel->indstats); + vacrel->pvs = NULL; } /* @@ -3627,77 +3136,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, return all_visible; } -/* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. The index - * is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. - * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. - */ -static int -parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum) -{ - int nindexes_parallel = 0; - int nindexes_parallel_bulkdel = 0; - int nindexes_parallel_cleanup = 0; - int parallel_workers; - - /* - * We don't allow performing parallel operation in standalone backend or - * when parallelism is disabled. - */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) - return 0; - - /* - * Compute the number of indexes that can participate in parallel vacuum. - */ - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - Relation indrel = vacrel->indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* Skip index that is not a suitable target for parallel index vacuum */ - if (vacoptions == VACUUM_OPTION_NO_PARALLEL || - RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) - continue; - - will_parallel_vacuum[idx] = true; - - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - nindexes_parallel_bulkdel++; - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - nindexes_parallel_cleanup++; - } - - nindexes_parallel = Max(nindexes_parallel_bulkdel, - nindexes_parallel_cleanup); - - /* The leader process takes one index */ - nindexes_parallel--; - - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; - - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); - - return parallel_workers; -} - /* * Update index statistics in pg_class if the statistics are accurate. */ @@ -3731,394 +3169,10 @@ update_index_statistics(LVRelState *vacrel) } /* - * Try to enter parallel mode and create a parallel context. Then initialize - * shared memory state. - * - * On success (when we can launch one or more workers), will set dead_items and - * lps in vacrel for caller. A set lps in vacrel state indicates that parallel - * VACUUM is currently active. - */ -static void -parallel_vacuum_begin(LVRelState *vacrel, int nrequested) -{ - LVParallelState *lps; - Relation *indrels = vacrel->indrels; - int nindexes = vacrel->nindexes; - ParallelContext *pcxt; - LVShared *shared; - VacDeadItems *dead_items; - LVParallelIndStats *pindstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - bool *will_parallel_vacuum; - int max_items; - Size est_pindstats_len; - Size est_shared_len; - Size est_dead_items_len; - int nindexes_mwm = 0; - int parallel_workers = 0; - int querylen; - - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ - Assert(nrequested >= 0); - Assert(nindexes > 0); - - /* - * Compute the number of parallel vacuum workers to launch - */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested, - will_parallel_vacuum); - if (parallel_workers <= 0) - { - /* Can't perform vacuum in parallel -- lps not set in vacrel */ - pfree(will_parallel_vacuum); - return; - } - - lps = (LVParallelState *) palloc0(sizeof(LVParallelState)); - - EnterParallelMode(); - pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", - parallel_workers); - Assert(pcxt->nworkers > 0); - lps->pcxt = pcxt; - lps->will_parallel_vacuum = will_parallel_vacuum; - - /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ - est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes); - shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared_len = sizeof(LVShared); - shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ - max_items = dead_items_max_items(vacrel); - est_dead_items_len = vac_max_items_to_alloc_size(max_items); - shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ - - InitializeParallelDSM(pcxt); - - /* Prepare index vacuum stats */ - pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats); - lps->lvpindstats = pindstats; - - /* Prepare shared information */ - shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); - MemSet(shared, 0, est_shared_len); - shared->relid = RelationGetRelid(vacrel->rel); - shared->elevel = elevel; - shared->maintenance_work_mem_worker = - (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; - - pg_atomic_init_u32(&(shared->cost_balance), 0); - pg_atomic_init_u32(&(shared->active_nworkers), 0); - pg_atomic_init_u32(&(shared->idx), 0); - - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); - lps->lvshared = shared; - - /* Prepare the dead_items space */ - dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, - est_dead_items_len); - dead_items->max_items = max_items; - dead_items->num_items = 0; - MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); - - /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize - */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - lps->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - lps->wal_usage = wal_usage; - - /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } - - /* Success -- set dead_items and lps in leader's vacrel state */ - vacrel->dead_items = dead_items; - vacrel->lps = lps; -} - -/* - * Destroy the parallel context, and end parallel mode. - * - * Since writes are not allowed during parallel mode, copy the - * updated index statistics from DSM into local memory and then later use that - * to update the index statistics. One might think that we can exit from - * parallel mode, update the index statistics and then destroy parallel - * context, but that won't be safe (see ExitParallelMode). - */ -static void -parallel_vacuum_end(LVRelState *vacrel) -{ - IndexBulkDeleteResult **indstats = vacrel->indstats; - LVParallelState *lps = vacrel->lps; - int nindexes = vacrel->nindexes; - - Assert(!IsParallelWorker()); - - /* Copy the updated statistics */ - for (int idx = 0; idx < nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - if (pindstats->istat_updated) - { - indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult)); - } - else - indstats[idx] = NULL; - } - - DestroyParallelContext(lps->pcxt); - ExitParallelMode(); - - /* Deactivate parallel vacuum */ - pfree(lps->will_parallel_vacuum); - pfree(lps); - vacrel->lps = NULL; -} - -/* - * Returns false, if the given index can't participate in the next execution of - * parallel index vacuum or parallel index cleanup. - */ -static bool -parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum) -{ - uint8 vacoptions; - - vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* In parallel vacuum case, check if it supports parallel bulk-deletion */ - if (vacuum) - return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); - - /* Not safe, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return false; - - /* - * Not safe, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). We do this to avoid - * the need to invoke workers when parallel index cleanup doesn't need to - * scan the index. See the comments for option - * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support - * parallel cleanup conditionally. - */ - if (vacrel->num_index_scans > 0 && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - - return true; -} - -/* - * Perform work within a launched parallel process. - * - * Since parallel vacuum workers perform only index vacuum or index cleanup, - * we don't need to report progress information. - */ -void -parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) -{ - Relation rel; - Relation *indrels; - LVParallelIndStats *lvpindstats; - LVShared *lvshared; - VacDeadItems *dead_items; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - int nindexes; - char *sharedquery; - LVRelState vacrel; - ErrorContextCallback errcallback; - - /* - * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we - * don't support parallel vacuum for autovacuum as of now. - */ - Assert(MyProc->statusFlags == PROC_IN_VACUUM); - - lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, - false); - elevel = lvshared->elevel; - - elog(DEBUG1, "starting parallel vacuum worker"); - - /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); - - /* - * Open table. The lock mode is the same as the leader process. It's - * okay because the lock mode does not conflict among the parallel - * workers. - */ - rel = table_open(lvshared->relid, ShareUpdateExclusiveLock); - - /* - * Open all indexes. indrels are sorted in order by OID, which should be - * matched to the leader's one. - */ - vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); - - /* Set index statistics */ - lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_INDEX_STATS, - false); - - /* Set dead_items space (set as worker's vacrel dead_items below) */ - dead_items = (VacDeadItems *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_DEAD_ITEMS, - false); - - /* Set cost-based vacuum delay */ - VacuumCostActive = (VacuumCostDelay > 0); - VacuumCostBalance = 0; - VacuumPageHit = 0; - VacuumPageMiss = 0; - VacuumPageDirty = 0; - VacuumCostBalanceLocal = 0; - VacuumSharedCostBalance = &(lvshared->cost_balance); - VacuumActiveNWorkers = &(lvshared->active_nworkers); - - vacrel.rel = rel; - vacrel.indrels = indrels; - vacrel.nindexes = nindexes; - /* Each parallel VACUUM worker gets its own access strategy */ - vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM); - vacrel.indstats = (IndexBulkDeleteResult **) - palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - - if (lvshared->maintenance_work_mem_worker > 0) - maintenance_work_mem = lvshared->maintenance_work_mem_worker; - - /* - * Initialize vacrel for use as error callback arg by parallel worker. - */ - vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel)); - vacrel.relname = pstrdup(RelationGetRelationName(rel)); - vacrel.indname = NULL; - vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */ - vacrel.dead_items = dead_items; - - /* Setup error traceback support for ereport() */ - errcallback.callback = vacuum_error_callback; - errcallback.arg = &vacrel; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); - - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats); - - /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - - vac_close_indexes(nindexes, indrels, RowExclusiveLock); - table_close(rel, ShareUpdateExclusiveLock); - FreeAccessStrategy(vacrel.bstrategy); - pfree(vacrel.indstats); -} - -/* - * Error context callback for errors occurring during vacuum. + * Error context callback for errors occurring during vacuum. The error + * context messages for index phases should match the messages set in parallel + * vacuum. If you change this function for those phases, change + * parallel_vacuum_error_callback() as well. */ static void vacuum_error_callback(void *arg) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index bb1881f573..ae7c7133dd 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,7 +14,6 @@ #include "postgres.h" -#include "access/heapam.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -25,6 +24,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index e8504f0ae4..48f7348f91 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -59,6 +59,7 @@ OBJS = \ typecmds.o \ user.o \ vacuum.o \ + vacuumparallel.o \ variable.o \ view.o diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 3b481bcf86..c94c187d36 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -7,8 +7,9 @@ * commands, (b) code to compute various vacuum thresholds, and (c) index * vacuum code. * - * VACUUM for heap AM is implemented in vacuumlazy.c, ANALYZE in analyze.c, and - * VACUUM FULL is a variant of CLUSTER, handled in cluster.c. + * VACUUM for heap AM is implemented in vacuumlazy.c, parallel vacuum in + * vacuumparallel.c, ANALYZE in analyze.c, and VACUUM FULL is a variant of + * CLUSTER, handled in cluster.c. * * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c new file mode 100644 index 0000000000..5dd70c5273 --- /dev/null +++ b/src/backend/commands/vacuumparallel.c @@ -0,0 +1,1068 @@ +/*------------------------------------------------------------------------- + * + * vacuumparallel.c + * Support routines for parallel vacuum execution. + * + * This file contains routines that are intended to support setting up, using, + * and tearing down a ParallelVacuumState. + * + * In a parallel vacuum, we perform both index bulk deletion and index cleanup + * with parallel worker processes. Individual indexes are processed by one + * vacuum process. ParalleVacuumState contains shared information as well as + * the memory space for storing dead items allocated in the DSM segment. We + * launch parallel worker processes at the start of parallel index + * bulk-deletion and index cleanup and once all indexes are processed, the + * parallel worker processes exit. Each time we process indexes parallelly, + * the parallel context is re-initialized so that the same DSM can be used for + * multiple passes of index bulk-deletion and index cleanup. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/commands/vacuumparallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amapi.h" +#include "access/table.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "optimizer/paths.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" + +/* + * DSM keys for parallel vacuum. Unlike other parallel execution code, since + * we don't need to worry about DSM keys conflicting with plan_node_id we can + * use small integers. + */ +#define PARALLEL_VACUUM_KEY_SHARED 1 +#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 +#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 +#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 + +/* + * Shared information among parallel workers. So this is allocated in the DSM + * segment. + */ +typedef struct PVShared +{ + /* + * Target table relid and log level. These fields are not modified during + * the parallel vacuum. + */ + Oid relid; + int elevel; + + /* + * Fields for both index vacuum and cleanup. + * + * reltuples is the total number of input heap tuples. We set either old + * live tuples in the index vacuum case or the new live tuples in the + * index cleanup case. + * + * estimated_count is true if reltuples is an estimated value. (Note that + * reltuples could be -1 in this case, indicating we have no idea.) + */ + double reltuples; + bool estimated_count; + + /* + * In single process vacuum we could consume more memory during index + * vacuuming or cleanup apart from the memory for heap scanning. In + * parallel vacuum, since individual vacuum workers can consume memory + * equal to maintenance_work_mem, the new maintenance_work_mem for each + * worker is set such that the parallel operation doesn't consume more + * memory than single process vacuum. + */ + int maintenance_work_mem_worker; + + /* + * Shared vacuum cost balance. During parallel vacuum, + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum worker. + */ + pg_atomic_uint32 cost_balance; + + /* + * Number of active parallel workers. This is used for computing the + * minimum threshold of the vacuum cost balance before a worker sleeps for + * cost-based delay. + */ + pg_atomic_uint32 active_nworkers; + + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; +} PVShared; + +/* Status used during parallel index vacuum or cleanup */ +typedef enum PVIndVacStatus +{ + PARALLEL_INDVAC_STATUS_INITIAL = 0, + PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, + PARALLEL_INDVAC_STATUS_NEED_CLEANUP, + PARALLEL_INDVAC_STATUS_COMPLETED +} PVIndVacStatus; + +/* + * Struct for index vacuum statistics of an index that is used for parallel vacuum. + * This includes the status of parallel index vacuum as well as index statistics. + */ +typedef struct PVIndStats +{ + /* + * The following two fields are set by leader process before executing + * parallel index vacuum or parallel index cleanup. These fields are not + * fixed for the entire VACUUM operation. They are only fixed for an + * individual parallel index vacuum and cleanup. + * + * parallel_workers_can_process is true if both leader and worker can + * process the index, otherwise only leader can process it. + */ + PVIndVacStatus status; + bool parallel_workers_can_process; + + /* + * Individual worker or leader stores the result of index vacuum or + * cleanup. + */ + bool istat_updated; /* are the stats updated? */ + IndexBulkDeleteResult istat; +} PVIndStats; + +/* Struct for maintaining a parallel vacuum state. */ +typedef struct ParallelVacuumState +{ + /* NULL for worker processes */ + ParallelContext *pcxt; + + /* Target indexes */ + Relation *indrels; + int nindexes; + + /* Shared information among parallel vacuum workers */ + PVShared *shared; + + /* + * Shared index statistics among parallel vacuum workers. The array + * element is allocated for every index, even those indexes where parallel + * index vacuuming is unsafe or not worthwhile (e.g., + * will_parallel_vacuum[] is false). During parallel vacuum, + * IndexBulkDeleteResult of each index is kept in DSM and is copied into + * local memory at the end of parallel vacuum. + */ + PVIndStats *indstats; + + /* Shared dead items space among parallel vacuum workers */ + VacDeadItems *dead_items; + + /* Points to buffer usage area in DSM */ + BufferUsage *buffer_usage; + + /* Points to WAL usage area in DSM */ + WalUsage *wal_usage; + + /* + * False if the index is totally unsuitable target for all parallel + * processing. For example, the index could be < + * min_parallel_index_scan_size cutoff. + */ + bool *will_parallel_vacuum; + + /* + * The number of indexes that support parallel index bulk-deletion and + * parallel index cleanup respectively. + */ + int nindexes_parallel_bulkdel; + int nindexes_parallel_cleanup; + int nindexes_parallel_condcleanup; + + /* Buffer access strategy used by leader process */ + BufferAccessStrategy bstrategy; + + /* + * Error reporting state. The error callback is set only for workers + * processes during parallel index vacuum. + */ + char *relnamespace; + char *relname; + char *indname; + PVIndVacStatus status; +} ParallelVacuumState; + +static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum); +static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum); +static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats); +static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum); +static void parallel_vacuum_error_callback(void *arg); + +/* + * Try to enter parallel mode and create a parallel context. Then initialize + * shared memory state. + * + * On success, return parallel vacuum state. Otherwise return NULL. + */ +ParallelVacuumState * +parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, + int nrequested_workers, int max_items, + int elevel, BufferAccessStrategy bstrategy) +{ + ParallelVacuumState *pvs; + ParallelContext *pcxt; + PVShared *shared; + VacDeadItems *dead_items; + PVIndStats *indstats; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + bool *will_parallel_vacuum; + Size est_indstats_len; + Size est_shared_len; + Size est_dead_items_len; + int nindexes_mwm = 0; + int parallel_workers = 0; + int querylen; + + /* + * A parallel vacuum must be requested and there must be indexes on the + * relation + */ + Assert(nrequested_workers >= 0); + Assert(nindexes > 0); + + /* + * Compute the number of parallel vacuum workers to launch + */ + will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); + parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, + nrequested_workers, + will_parallel_vacuum); + if (parallel_workers <= 0) + { + /* Can't perform vacuum in parallel -- return NULL */ + pfree(will_parallel_vacuum); + return NULL; + } + + pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); + pvs->indrels = indrels; + pvs->nindexes = nindexes; + pvs->will_parallel_vacuum = will_parallel_vacuum; + pvs->bstrategy = bstrategy; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", + parallel_workers); + Assert(pcxt->nworkers > 0); + pvs->pcxt = pcxt; + + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ + est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ + est_shared_len = sizeof(PVShared); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ + est_dead_items_len = vac_max_items_to_alloc_size(max_items); + shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for BufferUsage and WalUsage -- + * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgBufferUsage or + * pgWalUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + InitializeParallelDSM(pcxt); + + /* Prepare index vacuum stats */ + indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len); + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + if (!will_parallel_vacuum[i]) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + pvs->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + pvs->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + pvs->nindexes_parallel_condcleanup++; + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats); + pvs->indstats = indstats; + + /* Prepare shared information */ + shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); + MemSet(shared, 0, est_shared_len); + shared->relid = RelationGetRelid(rel); + shared->elevel = elevel; + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : + maintenance_work_mem; + + pg_atomic_init_u32(&(shared->cost_balance), 0); + pg_atomic_init_u32(&(shared->active_nworkers), 0); + pg_atomic_init_u32(&(shared->idx), 0); + + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); + pvs->shared = shared; + + /* Prepare the dead_items space */ + dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, + est_dead_items_len); + dead_items->max_items = max_items; + dead_items->num_items = 0; + MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); + pvs->dead_items = dead_items; + + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ + buffer_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); + pvs->buffer_usage = buffer_usage; + wal_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); + pvs->wal_usage = wal_usage; + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + sharedquery[querylen] = '\0'; + shm_toc_insert(pcxt->toc, + PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); + } + + /* Success -- return parallel vacuum state */ + return pvs; +} + +/* + * Destroy the parallel context, and end parallel mode. + * + * Since writes are not allowed during parallel mode, copy the + * updated index statistics from DSM into local memory and then later use that + * to update the index statistics. One might think that we can exit from + * parallel mode, update the index statistics and then destroy parallel + * context, but that won't be safe (see ExitParallelMode). + */ +void +parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) +{ + Assert(!IsParallelWorker()); + + /* Copy the updated statistics */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->istat_updated) + { + istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult)); + } + else + istats[i] = NULL; + } + + DestroyParallelContext(pvs->pcxt); + ExitParallelMode(); + + pfree(pvs->will_parallel_vacuum); + pfree(pvs); +} + +/* Returns the dead items space */ +VacDeadItems * +parallel_vacuum_get_dead_items(ParallelVacuumState *pvs) +{ + return pvs->dead_items; +} + +/* + * Do parallel index bulk-deletion with parallel workers. + */ +void +parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans) +{ + Assert(!IsParallelWorker()); + + /* + * We can only provide an approximate value of num_heap_tuples, at least + * for now. + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = true; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, true); +} + +/* + * Do parallel index cleanup with parallel workers. + */ +void +parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans, bool estimated_count) +{ + Assert(!IsParallelWorker()); + + /* + * We can provide a better estimate of total number of surviving tuples + * (we assume indexes are more interested in that than in the number of + * nominally live tuples). + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = estimated_count; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, false); +} + +/* + * Compute the number of parallel worker processes to request. Both index + * vacuum and index cleanup can be executed with parallel workers. + * The index is eligible for parallel vacuum iff its size is greater than + * min_parallel_index_scan_size as invoking workers for very small indexes + * can hurt performance. + * + * nrequested is the number of parallel workers that user requested. If + * nrequested is 0, we compute the parallel degree based on nindexes, that is + * the number of indexes that support parallel vacuum. This function also + * sets will_parallel_vacuum to remember indexes that participate in parallel + * vacuum. + */ +static int +parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum) +{ + int nindexes_parallel = 0; + int nindexes_parallel_bulkdel = 0; + int nindexes_parallel_cleanup = 0; + int parallel_workers; + + /* + * We don't allow performing parallel operation in standalone backend or + * when parallelism is disabled. + */ + if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + return 0; + + /* + * Compute the number of indexes that can participate in parallel vacuum. + */ + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* Skip index that is not a suitable target for parallel index vacuum */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + will_parallel_vacuum[i] = true; + + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + nindexes_parallel_bulkdel++; + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + nindexes_parallel_cleanup++; + } + + nindexes_parallel = Max(nindexes_parallel_bulkdel, + nindexes_parallel_cleanup); + + /* The leader process takes one index */ + nindexes_parallel--; + + /* No index supports parallel vacuum */ + if (nindexes_parallel <= 0) + return 0; + + /* Compute the parallel degree */ + parallel_workers = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; + + /* Cap by max_parallel_maintenance_workers */ + parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + + return parallel_workers; +} + +/* + * Perform index vacuum or index cleanup with parallel workers. This function + * must be used by the parallel vacuum leader process. + */ +static void +parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum) +{ + int nworkers; + PVIndVacStatus new_status; + + Assert(!IsParallelWorker()); + + if (vacuum) + { + new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_bulkdel; + } + else + { + new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (num_index_scans == 0) + nworkers += pvs->nindexes_parallel_condcleanup; + } + + /* The leader process will participate */ + nworkers--; + + /* + * It is possible that parallel context is initialized with fewer workers + * than the number of indexes that need a separate worker in the current + * phase, so we need to consider it. See + * parallel_vacuum_compute_workers(). + */ + nworkers = Min(nworkers, pvs->pcxt->nworkers); + + /* + * Set index vacuum status and mark whether parallel vacuum worker can + * process it. + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL); + indstats->status = new_status; + indstats->parallel_workers_can_process = + (pvs->will_parallel_vacuum[i] & + parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], + num_index_scans, + vacuum)); + } + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(pvs->shared->idx), 0); + + /* Setup the shared cost-based vacuum delay and launch workers */ + if (nworkers > 0) + { + /* Reinitialize parallel context to relaunch parallel workers */ + if (num_index_scans > 0) + ReinitializeParallelDSM(pvs->pcxt); + + /* + * Set up shared cost balance and the number of active workers for + * vacuum delay. We need to do this before launching workers as + * otherwise, they might not see the updated values for these + * parameters. + */ + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup + * phase. + */ + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + + LaunchParallelWorkers(pvs->pcxt); + + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have + * already accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } + + if (vacuum) + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", + "launched %d parallel vacuum workers for index vacuuming (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + else + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + } + + /* Vacuum the indexes that can be processed by only leader process */ + parallel_vacuum_process_unsafe_indexes(pvs); + + /* + * Join as a parallel worker. The leader vacuums alone processes all + * parallel-safe indexes in the case where no workers are launched. + */ + parallel_vacuum_process_safe_indexes(pvs); + + /* + * Next, accumulate buffer and WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) + */ + if (nworkers > 0) + { + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + } + + /* + * Reset all index status back to initial (while checking that we have + * vacuumed all indexes). + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) + elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", + RelationGetRelationName(pvs->indrels[i])); + + indstats->status = PARALLEL_INDVAC_STATUS_INITIAL; + } + + /* + * Carry the shared balance value to heap scan and disable shared costing + */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } +} + +/* + * Index vacuum/cleanup routine used by the leader process and parallel + * vacuum worker processes to vacuum the indexes in parallel. + */ +static void +parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs) +{ + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Loop until all indexes are vacuumed */ + for (;;) + { + int idx; + PVIndStats *indstats; + + /* Get an index number to process */ + idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1); + + /* Done for all indexes? */ + if (idx >= pvs->nindexes) + break; + + indstats = &(pvs->indstats[idx]); + + /* + * Skip vacuuming index that is unsafe for workers or has an + * unsuitable target for parallel index vacuum (this is vacuumed in + * parallel_vacuum_process_unsafe_indexes() by the leader). + */ + if (!indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Perform parallel vacuuming of indexes in leader process. + * + * Handles index vacuuming (or index cleanup) for indexes that are not + * parallel safe. It's possible that this will vary for a given index, based + * on details like whether we're performing index cleanup right now. + * + * Also performs vacuuming of smaller indexes that fell under the size cutoff + * enforced by parallel_vacuum_compute_workers(). + */ +static void +parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + /* Skip, indexes that are safe for workers */ + if (indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Vacuum or cleanup index either by leader process or by one of the worker + * process. After vacuuming the index this function copies the index + * statistics returned from ambulkdelete and amvacuumcleanup to the DSM + * segment. + */ +static void +parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats) +{ + IndexBulkDeleteResult *istat = NULL; + IndexBulkDeleteResult *istat_res; + IndexVacuumInfo ivinfo; + + /* + * Update the pointer to the corresponding bulk-deletion result if someone + * has already updated it + */ + if (indstats->istat_updated) + istat = &(indstats->istat); + + ivinfo.index = indrel; + ivinfo.analyze_only = false; + ivinfo.report_progress = false; + ivinfo.message_level = pvs->shared->elevel; + ivinfo.estimated_count = pvs->shared->estimated_count; + ivinfo.num_heap_tuples = pvs->shared->reltuples; + ivinfo.strategy = pvs->bstrategy; + + /* Update error traceback information */ + pvs->indname = pstrdup(RelationGetRelationName(indrel)); + pvs->status = indstats->status; + + switch (indstats->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + istat_res = vac_cleanup_one_index(&ivinfo, istat); + break; + default: + elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", + indstats->status, + RelationGetRelationName(indrel)); + } + + /* + * Copy the index bulk-deletion result returned from ambulkdelete and + * amvacuumcleanup to the DSM segment if it's the first cycle because they + * allocate locally and it's possible that an index will be vacuumed by a + * different vacuum process the next cycle. Copying the result normally + * happens only the first time an index is vacuumed. For any additional + * vacuum pass, we directly point to the result on the DSM segment and + * pass it to vacuum index APIs so that workers can update it directly. + * + * Since all vacuum workers write the bulk-deletion result at different + * slots we can write them without locking. + */ + if (!indstats->istat_updated && istat_res != NULL) + { + memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + indstats->istat_updated = true; + + /* Free the locally-allocated bulk-deletion result */ + pfree(istat_res); + } + + /* + * Update the status to completed. No need to lock here since each worker + * touches different indexes. + */ + indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; + + /* Reset error traceback information */ + pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; + pfree(pvs->indname); + pvs->indname = NULL; +} + +/* + * Returns false, if the given index can't participate in the next execution of + * parallel index vacuum or parallel index cleanup. + */ +static bool +parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum) +{ + uint8 vacoptions; + + vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (vacuum) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); + + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; + + /* + * Not safe, if the index supports parallel cleanup conditionally, but we + * have already processed the index (for bulkdelete). We do this to avoid + * the need to invoke workers when parallel index cleanup doesn't need to + * scan the index. See the comments for option + * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support + * parallel cleanup conditionally. + */ + if (num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + return false; + + return true; +} + +/* + * Perform work within a launched parallel process. + * + * Since parallel vacuum workers perform only index vacuum or index cleanup, + * we don't need to report progress information. + */ +void +parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelVacuumState pvs; + Relation rel; + Relation *indrels; + PVIndStats *indstats; + PVShared *shared; + VacDeadItems *dead_items; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + int nindexes; + char *sharedquery; + ErrorContextCallback errcallback; + + /* + * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we + * don't support parallel vacuum for autovacuum as of now. + */ + Assert(MyProc->statusFlags == PROC_IN_VACUUM); + + elog(DEBUG1, "starting parallel vacuum worker"); + + shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); + + /* Set debug_query_string for individual workers */ + sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* + * Open table. The lock mode is the same as the leader process. It's + * okay because the lock mode does not conflict among the parallel + * workers. + */ + rel = table_open(shared->relid, ShareUpdateExclusiveLock); + + /* + * Open all indexes. indrels are sorted in order by OID, which should be + * matched to the leader's one. + */ + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); + Assert(nindexes > 0); + + if (shared->maintenance_work_mem_worker > 0) + maintenance_work_mem = shared->maintenance_work_mem_worker; + + /* Set index statistics */ + indstats = (PVIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + + /* Set dead_items space */ + dead_items = (VacDeadItems *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_DEAD_ITEMS, + false); + + /* Set cost-based vacuum delay */ + VacuumCostActive = (VacuumCostDelay > 0); + VacuumCostBalance = 0; + VacuumPageHit = 0; + VacuumPageMiss = 0; + VacuumPageDirty = 0; + VacuumCostBalanceLocal = 0; + VacuumSharedCostBalance = &(shared->cost_balance); + VacuumActiveNWorkers = &(shared->active_nworkers); + + /* Set parallel vacuum state */ + pvs.indrels = indrels; + pvs.nindexes = nindexes; + pvs.indstats = indstats; + pvs.shared = shared; + pvs.dead_items = dead_items; + pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel)); + pvs.relname = pstrdup(RelationGetRelationName(rel)); + + /* These fields will be filled during index vacuum or cleanup */ + pvs.indname = NULL; + pvs.status = PARALLEL_INDVAC_STATUS_INITIAL; + + /* Each parallel VACUUM worker gets its own access strategy */ + pvs.bstrategy = GetAccessStrategy(BAS_VACUUM); + + /* Setup error traceback support for ereport() */ + errcallback.callback = parallel_vacuum_error_callback; + errcallback.arg = &pvs; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + + /* Report buffer/WAL usage during parallel execution */ + buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); + wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + vac_close_indexes(nindexes, indrels, RowExclusiveLock); + table_close(rel, ShareUpdateExclusiveLock); + FreeAccessStrategy(pvs.bstrategy); +} + +/* + * Error context callback for errors occurring during parallel index vacuum. + * The error context messages should match the messages set in the lazy vacuum + * error context. If you change this function, change vacuum_error_callback() + * as well. + */ +static void +parallel_vacuum_error_callback(void *arg) +{ + ParallelVacuumState *errinfo = arg; + + switch (errinfo->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_INITIAL: + case PARALLEL_INDVAC_STATUS_COMPLETED: + default: + return; + } +} diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 417dd288e5..f3fb1e93a5 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -198,7 +198,6 @@ extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets); struct VacuumParams; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); -extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 97bffa8ff1..5a36049be6 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -16,6 +16,7 @@ #include "access/htup.h" #include "access/genam.h" +#include "access/parallel.h" #include "catalog/pg_class.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" @@ -63,6 +64,9 @@ /* value for checking vacuum flags */ #define VACUUM_OPTION_MAX_VALID_VALUE ((1 << 3) - 1) +/* Abstract type for parallel vacuum state */ +typedef struct ParallelVacuumState ParallelVacuumState; + /*---------- * ANALYZE builds one of these structs for each attribute (column) that is * to be analyzed. The struct and subsidiary data are in anl_context, @@ -305,6 +309,22 @@ extern IndexBulkDeleteResult *vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat); extern Size vac_max_items_to_alloc_size(int max_items); +/* in commands/vacuumparallel.c */ +extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels, + int nindexes, int nrequested_workers, + int max_items, int elevel, + BufferAccessStrategy bstrategy); +extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); +extern VacDeadItems *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs); +extern void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, + long num_table_tuples, + int num_index_scans); +extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, + long num_table_tuples, + int num_index_scans, + bool estimated_count); +extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); + /* in commands/analyze.c */ extern void analyze_rel(Oid relid, RangeVar *relation, VacuumParams *params, List *va_cols, bool in_outer_xact, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9863508791..f093605472 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1306,13 +1306,8 @@ LPWSTR LSEG LUID LVPagePruneState -LVParallelIndStats -LVParallelIndVacStatus -LVParallelState LVRelState LVSavedErrInfo -LVShared -LVSharedIndStats LWLock LWLockHandle LWLockMode @@ -1775,7 +1770,10 @@ PTIterationArray PTOKEN_PRIVILEGES PTOKEN_USER PUTENVPROC +PVIndStats +PvIndVacStatus PVOID +PVShared PX_Alias PX_Cipher PX_Combo @@ -1809,6 +1807,7 @@ ParallelSlotResultHandler ParallelState ParallelTableScanDesc ParallelTableScanDescData +ParallelVacuumState ParallelWorkerContext ParallelWorkerInfo Param