From 8e1fae193864527c931a704bd7908e4fbc983f5c Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 23 Dec 2021 11:42:52 +0530 Subject: [PATCH] Move parallel vacuum code to vacuumparallel.c. This commit moves parallel vacuum related code to a new file commands/vacuumparallel.c so that any table AM supporting indexes can utilize parallel vacuum in order to call index AM callbacks (ambulkdelete and amvacuumcleanup) with parallel workers. Another reason for this refactoring is that the parallel vacuum isn't specific to heap so it doesn't make sense to keep this code in heap/vacuumlazy.c. Author: Masahiko Sawada, based on suggestion from Andres Freund Reviewed-by: Hou Zhijie, Amit Kapila, Haiying Tang Discussion: https://www.postgresql.org/message-id/20211030212101.ae3qcouatwmy7tbr%40alap3.anarazel.de --- src/backend/access/heap/vacuumlazy.c | 1002 +---------------------- src/backend/access/transam/parallel.c | 2 +- src/backend/commands/Makefile | 1 + src/backend/commands/vacuum.c | 5 +- src/backend/commands/vacuumparallel.c | 1068 +++++++++++++++++++++++++ src/include/access/heapam.h | 1 - src/include/commands/vacuum.h | 20 + src/tools/pgindent/typedefs.list | 9 +- 8 files changed, 1125 insertions(+), 983 deletions(-) create mode 100644 src/backend/commands/vacuumparallel.c diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index d8f1217504..cd603e6aa4 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -40,7 +40,6 @@ #include "access/heapam_xlog.h" #include "access/htup_details.h" #include "access/multixact.h" -#include "access/parallel.h" #include "access/transam.h" #include "access/visibilitymap.h" #include "access/xact.h" @@ -120,23 +119,11 @@ */ #define PREFETCH_SIZE ((BlockNumber) 32) -/* - * DSM keys for parallel vacuum. Unlike other parallel execution code, since - * we don't need to worry about DSM keys conflicting with plan_node_id we can - * use small integers. - */ -#define PARALLEL_VACUUM_KEY_SHARED 1 -#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 -#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 -#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 -#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 -#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 - /* * Macro to check if we are in a parallel vacuum. If true, we are in the * parallel mode and the DSM segment is initialized. */ -#define ParallelVacuumIsActive(vacrel) ((vacrel)->lps != NULL) +#define ParallelVacuumIsActive(vacrel) ((vacrel)->pvs != NULL) /* Phases of vacuum during which we report error context. */ typedef enum @@ -149,135 +136,6 @@ typedef enum VACUUM_ERRCB_PHASE_TRUNCATE } VacErrPhase; -/* - * Shared information among parallel workers. So this is allocated in the DSM - * segment. - */ -typedef struct LVShared -{ - /* - * Target table relid and log level. These fields are not modified during - * the lazy vacuum. - */ - Oid relid; - int elevel; - - /* - * Fields for both index vacuum and cleanup. - * - * reltuples is the total number of input heap tuples. We set either old - * live tuples in the index vacuum case or the new live tuples in the - * index cleanup case. - * - * estimated_count is true if reltuples is an estimated value. (Note that - * reltuples could be -1 in this case, indicating we have no idea.) - */ - double reltuples; - bool estimated_count; - - /* - * In single process lazy vacuum we could consume more memory during index - * vacuuming or cleanup apart from the memory for heap scanning. In - * parallel vacuum, since individual vacuum workers can consume memory - * equal to maintenance_work_mem, the new maintenance_work_mem for each - * worker is set such that the parallel operation doesn't consume more - * memory than single process lazy vacuum. - */ - int maintenance_work_mem_worker; - - /* - * Shared vacuum cost balance. During parallel vacuum, - * VacuumSharedCostBalance points to this value and it accumulates the - * balance of each parallel vacuum worker. - */ - pg_atomic_uint32 cost_balance; - - /* - * Number of active parallel workers. This is used for computing the - * minimum threshold of the vacuum cost balance before a worker sleeps for - * cost-based delay. - */ - pg_atomic_uint32 active_nworkers; - - /* Counter for vacuuming and cleanup */ - pg_atomic_uint32 idx; -} LVShared; - -/* Status used during parallel index vacuum or cleanup */ -typedef enum LVParallelIndVacStatus -{ - PARALLEL_INDVAC_STATUS_INITIAL = 0, - PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, - PARALLEL_INDVAC_STATUS_NEED_CLEANUP, - PARALLEL_INDVAC_STATUS_COMPLETED -} LVParallelIndVacStatus; - -/* - * Struct for index vacuum statistics of an index that is used for parallel vacuum. - * This includes the status of parallel index vacuum as well as index statistics. - */ -typedef struct LVParallelIndStats -{ - /* - * The following two fields are set by leader process before executing - * parallel index vacuum or parallel index cleanup. These fields are not - * fixed for the entire VACUUM operation. They are only fixed for an - * individual parallel index vacuum and cleanup. - * - * parallel_workers_can_process is true if both leader and worker can - * process the index, otherwise only leader can process it. - */ - LVParallelIndVacStatus status; - bool parallel_workers_can_process; - - /* - * Individual worker or leader stores the result of index vacuum or - * cleanup. - */ - bool istat_updated; /* are the stats updated? */ - IndexBulkDeleteResult istat; -} LVParallelIndStats; - -/* Struct for maintaining a parallel vacuum state. */ -typedef struct LVParallelState -{ - ParallelContext *pcxt; - - /* Shared information among parallel vacuum workers */ - LVShared *lvshared; - - /* - * Shared index statistics among parallel vacuum workers. The array - * element is allocated for every index, even those indexes where parallel - * index vacuuming is unsafe or not worthwhile (e.g., - * will_parallel_vacuum[] is false). During parallel vacuum, - * IndexBulkDeleteResult of each index is kept in DSM and is copied into - * local memory at the end of parallel vacuum. - */ - LVParallelIndStats *lvpindstats; - - /* Points to buffer usage area in DSM */ - BufferUsage *buffer_usage; - - /* Points to WAL usage area in DSM */ - WalUsage *wal_usage; - - /* - * False if the index is totally unsuitable target for all parallel - * processing. For example, the index could be < - * min_parallel_index_scan_size cutoff. - */ - bool *will_parallel_vacuum; - - /* - * The number of indexes that support parallel index bulk-deletion and - * parallel index cleanup respectively. - */ - int nindexes_parallel_bulkdel; - int nindexes_parallel_cleanup; - int nindexes_parallel_condcleanup; -} LVParallelState; - typedef struct LVRelState { /* Target heap relation and its indexes */ @@ -295,9 +153,9 @@ typedef struct LVRelState bool do_index_cleanup; bool do_rel_truncate; - /* Buffer access strategy and parallel state */ + /* Buffer access strategy and parallel vacuum state */ BufferAccessStrategy bstrategy; - LVParallelState *lps; + ParallelVacuumState *pvs; /* rel's initial relfrozenxid and relminmxid */ TransactionId relfrozenxid; @@ -399,13 +257,6 @@ static bool lazy_check_needs_freeze(Buffer buf, bool *hastup, LVRelState *vacrel); static bool lazy_check_wraparound_failsafe(LVRelState *vacrel); static void lazy_cleanup_all_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum); -static void parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats); -static void parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel); -static void parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, - LVParallelIndStats *pindstats); static IndexBulkDeleteResult *lazy_vacuum_one_index(Relation indrel, IndexBulkDeleteResult *istat, double reltuples, @@ -419,18 +270,11 @@ static bool should_attempt_truncation(LVRelState *vacrel); static void lazy_truncate_heap(LVRelState *vacrel); static BlockNumber count_nondeletable_pages(LVRelState *vacrel, bool *lock_waiter_detected); -static int dead_items_max_items(LVRelState *vacrel); static void dead_items_alloc(LVRelState *vacrel, int nworkers); static void dead_items_cleanup(LVRelState *vacrel); static bool heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, TransactionId *visibility_cutoff_xid, bool *all_frozen); -static int parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum); static void update_index_statistics(LVRelState *vacrel); -static void parallel_vacuum_begin(LVRelState *vacrel, int nrequested); -static void parallel_vacuum_end(LVRelState *vacrel); -static bool parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum); static void vacuum_error_callback(void *arg); static void update_vacuum_error_info(LVRelState *vacrel, LVSavedErrInfo *saved_vacrel, @@ -1601,7 +1445,8 @@ lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, bool aggressive) /* * Free resources managed by dead_items_alloc. This will end parallel - * mode when needed (it must end before we update index statistics). + * mode when needed (it must end before updating index statistics as we + * can't write in parallel mode). */ dead_items_cleanup(vacrel); @@ -2066,7 +1911,6 @@ lazy_vacuum(LVRelState *vacrel) /* Should not end up here with no indexes */ Assert(vacrel->nindexes > 0); - Assert(!IsParallelWorker()); Assert(vacrel->lpdead_item_pages > 0); if (!vacrel->do_index_vacuuming) @@ -2195,7 +2039,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) { bool allindexes = true; - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); Assert(vacrel->do_index_vacuuming); Assert(vacrel->do_index_cleanup); @@ -2235,7 +2078,8 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, true); + parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, + vacrel->num_index_scans); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2608,353 +2452,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) return false; } -/* - * Perform index vacuum or index cleanup with parallel workers. This function - * must be used by the parallel vacuum leader process. - */ -static void -parallel_vacuum_process_all_indexes(LVRelState *vacrel, bool vacuum) -{ - LVParallelState *lps = vacrel->lps; - LVParallelIndVacStatus new_status; - int nworkers; - - Assert(!IsParallelWorker()); - Assert(ParallelVacuumIsActive(vacrel)); - Assert(vacrel->nindexes > 0); - - if (vacuum) - { - /* - * We can only provide an approximate value of num_heap_tuples, at - * least for now. Matches serial VACUUM case. - */ - vacrel->lps->lvshared->reltuples = vacrel->old_live_tuples; - vacrel->lps->lvshared->estimated_count = true; - - new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_bulkdel; - } - else - { - /* - * We can provide a better estimate of total number of surviving - * tuples (we assume indexes are more interested in that than in the - * number of nominally live tuples). - */ - vacrel->lps->lvshared->reltuples = vacrel->new_rel_tuples; - vacrel->lps->lvshared->estimated_count = - (vacrel->tupcount_pages < vacrel->rel_pages); - - new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; - - /* Determine the number of parallel workers to launch */ - nworkers = vacrel->lps->nindexes_parallel_cleanup; - - /* Add conditionally parallel-aware indexes if in the first time call */ - if (vacrel->num_index_scans == 0) - nworkers += vacrel->lps->nindexes_parallel_condcleanup; - } - - /* The leader process will participate */ - nworkers--; - - /* - * It is possible that parallel context is initialized with fewer workers - * than the number of indexes that need a separate worker in the current - * phase, so we need to consider it. See - * parallel_vacuum_compute_workers(). - */ - nworkers = Min(nworkers, lps->pcxt->nworkers); - - /* - * Set index vacuum status and mark whether parallel vacuum worker can - * process it. - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(vacrel->lps->lvpindstats[i]); - - Assert(pindstats->status == PARALLEL_INDVAC_STATUS_INITIAL); - pindstats->status = new_status; - pindstats->parallel_workers_can_process = - (lps->will_parallel_vacuum[i] & - parallel_vacuum_index_is_parallel_safe(vacrel, vacrel->indrels[i], - vacuum)); - } - - /* Reset the parallel index processing counter */ - pg_atomic_write_u32(&(lps->lvshared->idx), 0); - - /* Setup the shared cost-based vacuum delay and launch workers */ - if (nworkers > 0) - { - /* Reinitialize parallel context to relaunch parallel workers */ - if (vacrel->num_index_scans > 0) - ReinitializeParallelDSM(lps->pcxt); - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(lps->pcxt, nworkers); - - LaunchParallelWorkers(lps->pcxt); - - if (lps->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(lps->lvshared->cost_balance); - VacuumActiveNWorkers = &(lps->lvshared->active_nworkers); - } - - if (vacuum) - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", - "launched %d parallel vacuum workers for index vacuuming (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - else - ereport(elevel, - (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", - "launched %d parallel vacuum workers for index cleanup (planned: %d)", - lps->pcxt->nworkers_launched), - lps->pcxt->nworkers_launched, nworkers))); - } - - /* Process the indexes that can be processed by only leader process */ - parallel_vacuum_process_unsafe_indexes(vacrel); - - /* - * Join as a parallel worker. The leader process alone processes all - * parallel-safe indexes in the case where no workers are launched. - */ - parallel_vacuum_process_safe_indexes(vacrel, lps->lvshared, lps->lvpindstats); - - /* - * Next, accumulate buffer and WAL usage. (This must wait for the workers - * to finish, or we might get incomplete data.) - */ - if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(lps->pcxt); - - for (int i = 0; i < lps->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); - } - - /* - * Reset all index status back to initial (while checking that we have - * processed all indexes). - */ - for (int i = 0; i < vacrel->nindexes; i++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[i]); - - if (pindstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) - elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", - RelationGetRelationName(vacrel->indrels[i])); - - pindstats->status = PARALLEL_INDVAC_STATUS_INITIAL; - } - - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } -} - -/* - * Index vacuum/cleanup routine used by the leader process and parallel - * vacuum worker processes to process the indexes in parallel. - */ -static void -parallel_vacuum_process_safe_indexes(LVRelState *vacrel, LVShared *shared, - LVParallelIndStats *pindstats) -{ - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - /* Loop until all indexes are vacuumed */ - for (;;) - { - int idx; - LVParallelIndStats *pis; - - /* Get an index number to process */ - idx = pg_atomic_fetch_add_u32(&(shared->idx), 1); - - /* Done for all indexes? */ - if (idx >= vacrel->nindexes) - break; - - pis = &(pindstats[idx]); - - /* - * Skip processing index that is unsafe for workers or has an - * unsuitable target for parallel index vacuum (this is processed in - * parallel_vacuum_process_unsafe_indexes() by the leader). - */ - if (!pis->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - shared, pis); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Perform parallel processing of indexes in leader process. - * - * Handles index vacuuming (or index cleanup) for indexes that are not - * parallel safe. It's possible that this will vary for a given index, based - * on details like whether we're performing index cleanup right now. - * - * Also performs processing of smaller indexes that fell under the size cutoff - * enforced by parallel_vacuum_compute_workers(). - */ -static void -parallel_vacuum_process_unsafe_indexes(LVRelState *vacrel) -{ - LVParallelState *lps = vacrel->lps; - - Assert(!IsParallelWorker()); - - /* - * Increment the active worker count if we are able to launch any worker. - */ - if (VacuumActiveNWorkers) - pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); - - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - /* Skip, indexes that are safe for workers */ - if (pindstats->parallel_workers_can_process) - continue; - - /* Do vacuum or cleanup of the index */ - parallel_vacuum_process_one_index(vacrel, vacrel->indrels[idx], - lps->lvshared, pindstats); - } - - /* - * We have completed the index vacuum so decrement the active worker - * count. - */ - if (VacuumActiveNWorkers) - pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); -} - -/* - * Vacuum or cleanup index either by leader process or by one of the worker - * process. After processing the index this function copies the index - * statistics returned from ambulkdelete and amvacuumcleanup to the DSM - * segment. - */ -static void -parallel_vacuum_process_one_index(LVRelState *vacrel, Relation indrel, - LVShared *shared, LVParallelIndStats *pindstats) -{ - IndexBulkDeleteResult *istat = NULL; - IndexBulkDeleteResult *istat_res; - - /* - * Update the pointer to the corresponding bulk-deletion result if someone - * has already updated it - */ - if (pindstats->istat_updated) - istat = &(pindstats->istat); - - switch (pindstats->status) - { - case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: - istat_res = lazy_vacuum_one_index(indrel, istat, - shared->reltuples, vacrel); - break; - case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: - istat_res = lazy_cleanup_one_index(indrel, istat, - shared->reltuples, - shared->estimated_count, - vacrel); - break; - default: - elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", - pindstats->status, - RelationGetRelationName(indrel)); - } - - /* - * Copy the index bulk-deletion result returned from ambulkdelete and - * amvacuumcleanup to the DSM segment if it's the first cycle because they - * allocate locally and it's possible that an index will be vacuumed by a - * different vacuum process the next cycle. Copying the result normally - * happens only the first time an index is vacuumed. For any additional - * vacuum pass, we directly point to the result on the DSM segment and - * pass it to vacuum index APIs so that workers can update it directly. - * - * Since all vacuum workers write the bulk-deletion result at different - * slots we can write them without locking. - */ - if (!pindstats->istat_updated && istat_res != NULL) - { - memcpy(&(pindstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); - pindstats->istat_updated = true; - - /* Free the locally-allocated bulk-deletion result */ - pfree(istat_res); - } - - /* - * Update the status to completed. No need to lock here since each worker - * touches different indexes. - */ - pindstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; -} - /* * lazy_cleanup_all_indexes() -- cleanup all indexes of relation. */ static void lazy_cleanup_all_indexes(LVRelState *vacrel) { - Assert(!IsParallelWorker()); Assert(vacrel->nindexes > 0); /* Report that we are now cleaning up indexes */ @@ -2980,7 +2483,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) else { /* Outsource everything to parallel variant */ - parallel_vacuum_process_all_indexes(vacrel, false); + parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, + vacrel->num_index_scans, + (vacrel->tupcount_pages < vacrel->rel_pages)); } } @@ -3409,8 +2914,6 @@ dead_items_max_items(LVRelState *vacrel) autovacuum_work_mem != -1 ? autovacuum_work_mem : maintenance_work_mem; - Assert(!IsParallelWorker()); - if (vacrel->nindexes > 0) { BlockNumber rel_pages = vacrel->rel_pages; @@ -3448,6 +2951,9 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) VacDeadItems *dead_items; int max_items; + max_items = dead_items_max_items(vacrel); + Assert(max_items >= MaxHeapTuplesPerPage); + /* * Initialize state for a parallel vacuum. As of now, only one worker can * be used for an index, so we invoke parallelism only if there are at @@ -3471,15 +2977,20 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->relname))); } else - parallel_vacuum_begin(vacrel, nworkers); + vacrel->pvs = parallel_vacuum_init(vacrel->rel, vacrel->indrels, + vacrel->nindexes, nworkers, + max_items, elevel, + vacrel->bstrategy); - /* If parallel mode started, vacrel->dead_items allocated in DSM */ + /* If parallel mode started, dead_items space is allocated in DSM */ if (ParallelVacuumIsActive(vacrel)) + { + vacrel->dead_items = parallel_vacuum_get_dead_items(vacrel->pvs); return; + } } /* Serial VACUUM case */ - max_items = dead_items_max_items(vacrel); dead_items = (VacDeadItems *) palloc(vac_max_items_to_alloc_size(max_items)); dead_items->max_items = max_items; dead_items->num_items = 0; @@ -3499,11 +3010,9 @@ dead_items_cleanup(LVRelState *vacrel) return; } - /* - * End parallel mode before updating index statistics as we cannot write - * during parallel mode. - */ - parallel_vacuum_end(vacrel); + /* End parallel mode */ + parallel_vacuum_end(vacrel->pvs, vacrel->indstats); + vacrel->pvs = NULL; } /* @@ -3627,77 +3136,6 @@ heap_page_is_all_visible(LVRelState *vacrel, Buffer buf, return all_visible; } -/* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. The index - * is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. - * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. - */ -static int -parallel_vacuum_compute_workers(LVRelState *vacrel, int nrequested, - bool *will_parallel_vacuum) -{ - int nindexes_parallel = 0; - int nindexes_parallel_bulkdel = 0; - int nindexes_parallel_cleanup = 0; - int parallel_workers; - - /* - * We don't allow performing parallel operation in standalone backend or - * when parallelism is disabled. - */ - if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) - return 0; - - /* - * Compute the number of indexes that can participate in parallel vacuum. - */ - for (int idx = 0; idx < vacrel->nindexes; idx++) - { - Relation indrel = vacrel->indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* Skip index that is not a suitable target for parallel index vacuum */ - if (vacoptions == VACUUM_OPTION_NO_PARALLEL || - RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) - continue; - - will_parallel_vacuum[idx] = true; - - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - nindexes_parallel_bulkdel++; - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - nindexes_parallel_cleanup++; - } - - nindexes_parallel = Max(nindexes_parallel_bulkdel, - nindexes_parallel_cleanup); - - /* The leader process takes one index */ - nindexes_parallel--; - - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; - - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); - - return parallel_workers; -} - /* * Update index statistics in pg_class if the statistics are accurate. */ @@ -3731,394 +3169,10 @@ update_index_statistics(LVRelState *vacrel) } /* - * Try to enter parallel mode and create a parallel context. Then initialize - * shared memory state. - * - * On success (when we can launch one or more workers), will set dead_items and - * lps in vacrel for caller. A set lps in vacrel state indicates that parallel - * VACUUM is currently active. - */ -static void -parallel_vacuum_begin(LVRelState *vacrel, int nrequested) -{ - LVParallelState *lps; - Relation *indrels = vacrel->indrels; - int nindexes = vacrel->nindexes; - ParallelContext *pcxt; - LVShared *shared; - VacDeadItems *dead_items; - LVParallelIndStats *pindstats; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - bool *will_parallel_vacuum; - int max_items; - Size est_pindstats_len; - Size est_shared_len; - Size est_dead_items_len; - int nindexes_mwm = 0; - int parallel_workers = 0; - int querylen; - - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ - Assert(nrequested >= 0); - Assert(nindexes > 0); - - /* - * Compute the number of parallel vacuum workers to launch - */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(vacrel, nrequested, - will_parallel_vacuum); - if (parallel_workers <= 0) - { - /* Can't perform vacuum in parallel -- lps not set in vacrel */ - pfree(will_parallel_vacuum); - return; - } - - lps = (LVParallelState *) palloc0(sizeof(LVParallelState)); - - EnterParallelMode(); - pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", - parallel_workers); - Assert(pcxt->nworkers > 0); - lps->pcxt = pcxt; - lps->will_parallel_vacuum = will_parallel_vacuum; - - /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_STATS */ - est_pindstats_len = mul_size(sizeof(LVParallelIndStats), nindexes); - shm_toc_estimate_chunk(&pcxt->estimator, est_pindstats_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ - est_shared_len = sizeof(LVShared); - shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ - max_items = dead_items_max_items(vacrel); - est_dead_items_len = vac_max_items_to_alloc_size(max_items); - shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* - * Estimate space for BufferUsage and WalUsage -- - * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); - - /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ - - InitializeParallelDSM(pcxt); - - /* Prepare index vacuum stats */ - pindstats = (LVParallelIndStats *) shm_toc_allocate(pcxt->toc, est_pindstats_len); - for (int idx = 0; idx < nindexes; idx++) - { - Relation indrel = indrels[idx]; - uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* - * Cleanup option should be either disabled, always performing in - * parallel or conditionally performing in parallel. - */ - Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); - Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - - if (!will_parallel_vacuum[idx]) - continue; - - if (indrel->rd_indam->amusemaintenanceworkmem) - nindexes_mwm++; - - /* - * Remember the number of indexes that support parallel operation for - * each phase. - */ - if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) - lps->nindexes_parallel_bulkdel++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) - lps->nindexes_parallel_cleanup++; - if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) - lps->nindexes_parallel_condcleanup++; - } - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, pindstats); - lps->lvpindstats = pindstats; - - /* Prepare shared information */ - shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); - MemSet(shared, 0, est_shared_len); - shared->relid = RelationGetRelid(vacrel->rel); - shared->elevel = elevel; - shared->maintenance_work_mem_worker = - (nindexes_mwm > 0) ? - maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : - maintenance_work_mem; - - pg_atomic_init_u32(&(shared->cost_balance), 0); - pg_atomic_init_u32(&(shared->active_nworkers), 0); - pg_atomic_init_u32(&(shared->idx), 0); - - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); - lps->lvshared = shared; - - /* Prepare the dead_items space */ - dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, - est_dead_items_len); - dead_items->max_items = max_items; - dead_items->num_items = 0; - MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); - - /* - * Allocate space for each worker's BufferUsage and WalUsage; no need to - * initialize - */ - buffer_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(BufferUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); - lps->buffer_usage = buffer_usage; - wal_usage = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(WalUsage), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); - lps->wal_usage = wal_usage; - - /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } - - /* Success -- set dead_items and lps in leader's vacrel state */ - vacrel->dead_items = dead_items; - vacrel->lps = lps; -} - -/* - * Destroy the parallel context, and end parallel mode. - * - * Since writes are not allowed during parallel mode, copy the - * updated index statistics from DSM into local memory and then later use that - * to update the index statistics. One might think that we can exit from - * parallel mode, update the index statistics and then destroy parallel - * context, but that won't be safe (see ExitParallelMode). - */ -static void -parallel_vacuum_end(LVRelState *vacrel) -{ - IndexBulkDeleteResult **indstats = vacrel->indstats; - LVParallelState *lps = vacrel->lps; - int nindexes = vacrel->nindexes; - - Assert(!IsParallelWorker()); - - /* Copy the updated statistics */ - for (int idx = 0; idx < nindexes; idx++) - { - LVParallelIndStats *pindstats = &(lps->lvpindstats[idx]); - - if (pindstats->istat_updated) - { - indstats[idx] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); - memcpy(indstats[idx], &pindstats->istat, sizeof(IndexBulkDeleteResult)); - } - else - indstats[idx] = NULL; - } - - DestroyParallelContext(lps->pcxt); - ExitParallelMode(); - - /* Deactivate parallel vacuum */ - pfree(lps->will_parallel_vacuum); - pfree(lps); - vacrel->lps = NULL; -} - -/* - * Returns false, if the given index can't participate in the next execution of - * parallel index vacuum or parallel index cleanup. - */ -static bool -parallel_vacuum_index_is_parallel_safe(LVRelState *vacrel, Relation indrel, - bool vacuum) -{ - uint8 vacoptions; - - vacoptions = indrel->rd_indam->amparallelvacuumoptions; - - /* In parallel vacuum case, check if it supports parallel bulk-deletion */ - if (vacuum) - return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); - - /* Not safe, if the index does not support parallel cleanup */ - if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) - return false; - - /* - * Not safe, if the index supports parallel cleanup conditionally, but we - * have already processed the index (for bulkdelete). We do this to avoid - * the need to invoke workers when parallel index cleanup doesn't need to - * scan the index. See the comments for option - * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support - * parallel cleanup conditionally. - */ - if (vacrel->num_index_scans > 0 && - ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) - return false; - - return true; -} - -/* - * Perform work within a launched parallel process. - * - * Since parallel vacuum workers perform only index vacuum or index cleanup, - * we don't need to report progress information. - */ -void -parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) -{ - Relation rel; - Relation *indrels; - LVParallelIndStats *lvpindstats; - LVShared *lvshared; - VacDeadItems *dead_items; - BufferUsage *buffer_usage; - WalUsage *wal_usage; - int nindexes; - char *sharedquery; - LVRelState vacrel; - ErrorContextCallback errcallback; - - /* - * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we - * don't support parallel vacuum for autovacuum as of now. - */ - Assert(MyProc->statusFlags == PROC_IN_VACUUM); - - lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, - false); - elevel = lvshared->elevel; - - elog(DEBUG1, "starting parallel vacuum worker"); - - /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); - - /* - * Open table. The lock mode is the same as the leader process. It's - * okay because the lock mode does not conflict among the parallel - * workers. - */ - rel = table_open(lvshared->relid, ShareUpdateExclusiveLock); - - /* - * Open all indexes. indrels are sorted in order by OID, which should be - * matched to the leader's one. - */ - vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); - - /* Set index statistics */ - lvpindstats = (LVParallelIndStats *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_INDEX_STATS, - false); - - /* Set dead_items space (set as worker's vacrel dead_items below) */ - dead_items = (VacDeadItems *) shm_toc_lookup(toc, - PARALLEL_VACUUM_KEY_DEAD_ITEMS, - false); - - /* Set cost-based vacuum delay */ - VacuumCostActive = (VacuumCostDelay > 0); - VacuumCostBalance = 0; - VacuumPageHit = 0; - VacuumPageMiss = 0; - VacuumPageDirty = 0; - VacuumCostBalanceLocal = 0; - VacuumSharedCostBalance = &(lvshared->cost_balance); - VacuumActiveNWorkers = &(lvshared->active_nworkers); - - vacrel.rel = rel; - vacrel.indrels = indrels; - vacrel.nindexes = nindexes; - /* Each parallel VACUUM worker gets its own access strategy */ - vacrel.bstrategy = GetAccessStrategy(BAS_VACUUM); - vacrel.indstats = (IndexBulkDeleteResult **) - palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); - - if (lvshared->maintenance_work_mem_worker > 0) - maintenance_work_mem = lvshared->maintenance_work_mem_worker; - - /* - * Initialize vacrel for use as error callback arg by parallel worker. - */ - vacrel.relnamespace = get_namespace_name(RelationGetNamespace(rel)); - vacrel.relname = pstrdup(RelationGetRelationName(rel)); - vacrel.indname = NULL; - vacrel.phase = VACUUM_ERRCB_PHASE_UNKNOWN; /* Not yet processing */ - vacrel.dead_items = dead_items; - - /* Setup error traceback support for ereport() */ - errcallback.callback = vacuum_error_callback; - errcallback.arg = &vacrel; - errcallback.previous = error_context_stack; - error_context_stack = &errcallback; - - /* Prepare to track buffer usage during parallel execution */ - InstrStartParallelQuery(); - - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&vacrel, lvshared, lvpindstats); - - /* Report buffer/WAL usage during parallel execution */ - buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], - &wal_usage[ParallelWorkerNumber]); - - /* Pop the error context stack */ - error_context_stack = errcallback.previous; - - vac_close_indexes(nindexes, indrels, RowExclusiveLock); - table_close(rel, ShareUpdateExclusiveLock); - FreeAccessStrategy(vacrel.bstrategy); - pfree(vacrel.indstats); -} - -/* - * Error context callback for errors occurring during vacuum. + * Error context callback for errors occurring during vacuum. The error + * context messages for index phases should match the messages set in parallel + * vacuum. If you change this function for those phases, change + * parallel_vacuum_error_callback() as well. */ static void vacuum_error_callback(void *arg) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index bb1881f573..ae7c7133dd 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -14,7 +14,6 @@ #include "postgres.h" -#include "access/heapam.h" #include "access/nbtree.h" #include "access/parallel.h" #include "access/session.h" @@ -25,6 +24,7 @@ #include "catalog/pg_enum.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "executor/execParallel.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index e8504f0ae4..48f7348f91 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -59,6 +59,7 @@ OBJS = \ typecmds.o \ user.o \ vacuum.o \ + vacuumparallel.o \ variable.o \ view.o diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 3b481bcf86..c94c187d36 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -7,8 +7,9 @@ * commands, (b) code to compute various vacuum thresholds, and (c) index * vacuum code. * - * VACUUM for heap AM is implemented in vacuumlazy.c, ANALYZE in analyze.c, and - * VACUUM FULL is a variant of CLUSTER, handled in cluster.c. + * VACUUM for heap AM is implemented in vacuumlazy.c, parallel vacuum in + * vacuumparallel.c, ANALYZE in analyze.c, and VACUUM FULL is a variant of + * CLUSTER, handled in cluster.c. * * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c new file mode 100644 index 0000000000..5dd70c5273 --- /dev/null +++ b/src/backend/commands/vacuumparallel.c @@ -0,0 +1,1068 @@ +/*------------------------------------------------------------------------- + * + * vacuumparallel.c + * Support routines for parallel vacuum execution. + * + * This file contains routines that are intended to support setting up, using, + * and tearing down a ParallelVacuumState. + * + * In a parallel vacuum, we perform both index bulk deletion and index cleanup + * with parallel worker processes. Individual indexes are processed by one + * vacuum process. ParalleVacuumState contains shared information as well as + * the memory space for storing dead items allocated in the DSM segment. We + * launch parallel worker processes at the start of parallel index + * bulk-deletion and index cleanup and once all indexes are processed, the + * parallel worker processes exit. Each time we process indexes parallelly, + * the parallel context is re-initialized so that the same DSM can be used for + * multiple passes of index bulk-deletion and index cleanup. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/commands/vacuumparallel.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/amapi.h" +#include "access/table.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "optimizer/paths.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" + +/* + * DSM keys for parallel vacuum. Unlike other parallel execution code, since + * we don't need to worry about DSM keys conflicting with plan_node_id we can + * use small integers. + */ +#define PARALLEL_VACUUM_KEY_SHARED 1 +#define PARALLEL_VACUUM_KEY_DEAD_ITEMS 2 +#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 +#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 +#define PARALLEL_VACUUM_KEY_INDEX_STATS 6 + +/* + * Shared information among parallel workers. So this is allocated in the DSM + * segment. + */ +typedef struct PVShared +{ + /* + * Target table relid and log level. These fields are not modified during + * the parallel vacuum. + */ + Oid relid; + int elevel; + + /* + * Fields for both index vacuum and cleanup. + * + * reltuples is the total number of input heap tuples. We set either old + * live tuples in the index vacuum case or the new live tuples in the + * index cleanup case. + * + * estimated_count is true if reltuples is an estimated value. (Note that + * reltuples could be -1 in this case, indicating we have no idea.) + */ + double reltuples; + bool estimated_count; + + /* + * In single process vacuum we could consume more memory during index + * vacuuming or cleanup apart from the memory for heap scanning. In + * parallel vacuum, since individual vacuum workers can consume memory + * equal to maintenance_work_mem, the new maintenance_work_mem for each + * worker is set such that the parallel operation doesn't consume more + * memory than single process vacuum. + */ + int maintenance_work_mem_worker; + + /* + * Shared vacuum cost balance. During parallel vacuum, + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum worker. + */ + pg_atomic_uint32 cost_balance; + + /* + * Number of active parallel workers. This is used for computing the + * minimum threshold of the vacuum cost balance before a worker sleeps for + * cost-based delay. + */ + pg_atomic_uint32 active_nworkers; + + /* Counter for vacuuming and cleanup */ + pg_atomic_uint32 idx; +} PVShared; + +/* Status used during parallel index vacuum or cleanup */ +typedef enum PVIndVacStatus +{ + PARALLEL_INDVAC_STATUS_INITIAL = 0, + PARALLEL_INDVAC_STATUS_NEED_BULKDELETE, + PARALLEL_INDVAC_STATUS_NEED_CLEANUP, + PARALLEL_INDVAC_STATUS_COMPLETED +} PVIndVacStatus; + +/* + * Struct for index vacuum statistics of an index that is used for parallel vacuum. + * This includes the status of parallel index vacuum as well as index statistics. + */ +typedef struct PVIndStats +{ + /* + * The following two fields are set by leader process before executing + * parallel index vacuum or parallel index cleanup. These fields are not + * fixed for the entire VACUUM operation. They are only fixed for an + * individual parallel index vacuum and cleanup. + * + * parallel_workers_can_process is true if both leader and worker can + * process the index, otherwise only leader can process it. + */ + PVIndVacStatus status; + bool parallel_workers_can_process; + + /* + * Individual worker or leader stores the result of index vacuum or + * cleanup. + */ + bool istat_updated; /* are the stats updated? */ + IndexBulkDeleteResult istat; +} PVIndStats; + +/* Struct for maintaining a parallel vacuum state. */ +typedef struct ParallelVacuumState +{ + /* NULL for worker processes */ + ParallelContext *pcxt; + + /* Target indexes */ + Relation *indrels; + int nindexes; + + /* Shared information among parallel vacuum workers */ + PVShared *shared; + + /* + * Shared index statistics among parallel vacuum workers. The array + * element is allocated for every index, even those indexes where parallel + * index vacuuming is unsafe or not worthwhile (e.g., + * will_parallel_vacuum[] is false). During parallel vacuum, + * IndexBulkDeleteResult of each index is kept in DSM and is copied into + * local memory at the end of parallel vacuum. + */ + PVIndStats *indstats; + + /* Shared dead items space among parallel vacuum workers */ + VacDeadItems *dead_items; + + /* Points to buffer usage area in DSM */ + BufferUsage *buffer_usage; + + /* Points to WAL usage area in DSM */ + WalUsage *wal_usage; + + /* + * False if the index is totally unsuitable target for all parallel + * processing. For example, the index could be < + * min_parallel_index_scan_size cutoff. + */ + bool *will_parallel_vacuum; + + /* + * The number of indexes that support parallel index bulk-deletion and + * parallel index cleanup respectively. + */ + int nindexes_parallel_bulkdel; + int nindexes_parallel_cleanup; + int nindexes_parallel_condcleanup; + + /* Buffer access strategy used by leader process */ + BufferAccessStrategy bstrategy; + + /* + * Error reporting state. The error callback is set only for workers + * processes during parallel index vacuum. + */ + char *relnamespace; + char *relname; + char *indname; + PVIndVacStatus status; +} ParallelVacuumState; + +static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum); +static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum); +static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs); +static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats); +static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum); +static void parallel_vacuum_error_callback(void *arg); + +/* + * Try to enter parallel mode and create a parallel context. Then initialize + * shared memory state. + * + * On success, return parallel vacuum state. Otherwise return NULL. + */ +ParallelVacuumState * +parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, + int nrequested_workers, int max_items, + int elevel, BufferAccessStrategy bstrategy) +{ + ParallelVacuumState *pvs; + ParallelContext *pcxt; + PVShared *shared; + VacDeadItems *dead_items; + PVIndStats *indstats; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + bool *will_parallel_vacuum; + Size est_indstats_len; + Size est_shared_len; + Size est_dead_items_len; + int nindexes_mwm = 0; + int parallel_workers = 0; + int querylen; + + /* + * A parallel vacuum must be requested and there must be indexes on the + * relation + */ + Assert(nrequested_workers >= 0); + Assert(nindexes > 0); + + /* + * Compute the number of parallel vacuum workers to launch + */ + will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); + parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, + nrequested_workers, + will_parallel_vacuum); + if (parallel_workers <= 0) + { + /* Can't perform vacuum in parallel -- return NULL */ + pfree(will_parallel_vacuum); + return NULL; + } + + pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); + pvs->indrels = indrels; + pvs->nindexes = nindexes; + pvs->will_parallel_vacuum = will_parallel_vacuum; + pvs->bstrategy = bstrategy; + + EnterParallelMode(); + pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", + parallel_workers); + Assert(pcxt->nworkers > 0); + pvs->pcxt = pcxt; + + /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ + est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); + shm_toc_estimate_chunk(&pcxt->estimator, est_indstats_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */ + est_shared_len = sizeof(PVShared); + shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate size for dead_items -- PARALLEL_VACUUM_KEY_DEAD_ITEMS */ + est_dead_items_len = vac_max_items_to_alloc_size(max_items); + shm_toc_estimate_chunk(&pcxt->estimator, est_dead_items_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* + * Estimate space for BufferUsage and WalUsage -- + * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. + * + * If there are no extensions loaded that care, we could skip this. We + * have no way of knowing whether anyone's looking at pgBufferUsage or + * pgWalUsage, so do it unconditionally. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ + if (debug_query_string) + { + querylen = strlen(debug_query_string); + shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + else + querylen = 0; /* keep compiler quiet */ + + InitializeParallelDSM(pcxt); + + /* Prepare index vacuum stats */ + indstats = (PVIndStats *) shm_toc_allocate(pcxt->toc, est_indstats_len); + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* + * Cleanup option should be either disabled, always performing in + * parallel or conditionally performing in parallel. + */ + Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); + Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); + + if (!will_parallel_vacuum[i]) + continue; + + if (indrel->rd_indam->amusemaintenanceworkmem) + nindexes_mwm++; + + /* + * Remember the number of indexes that support parallel operation for + * each phase. + */ + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + pvs->nindexes_parallel_bulkdel++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) + pvs->nindexes_parallel_cleanup++; + if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0) + pvs->nindexes_parallel_condcleanup++; + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INDEX_STATS, indstats); + pvs->indstats = indstats; + + /* Prepare shared information */ + shared = (PVShared *) shm_toc_allocate(pcxt->toc, est_shared_len); + MemSet(shared, 0, est_shared_len); + shared->relid = RelationGetRelid(rel); + shared->elevel = elevel; + shared->maintenance_work_mem_worker = + (nindexes_mwm > 0) ? + maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : + maintenance_work_mem; + + pg_atomic_init_u32(&(shared->cost_balance), 0); + pg_atomic_init_u32(&(shared->active_nworkers), 0); + pg_atomic_init_u32(&(shared->idx), 0); + + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); + pvs->shared = shared; + + /* Prepare the dead_items space */ + dead_items = (VacDeadItems *) shm_toc_allocate(pcxt->toc, + est_dead_items_len); + dead_items->max_items = max_items; + dead_items->num_items = 0; + MemSet(dead_items->items, 0, sizeof(ItemPointerData) * max_items); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_ITEMS, dead_items); + pvs->dead_items = dead_items; + + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ + buffer_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(BufferUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); + pvs->buffer_usage = buffer_usage; + wal_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); + pvs->wal_usage = wal_usage; + + /* Store query string for workers */ + if (debug_query_string) + { + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + sharedquery[querylen] = '\0'; + shm_toc_insert(pcxt->toc, + PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); + } + + /* Success -- return parallel vacuum state */ + return pvs; +} + +/* + * Destroy the parallel context, and end parallel mode. + * + * Since writes are not allowed during parallel mode, copy the + * updated index statistics from DSM into local memory and then later use that + * to update the index statistics. One might think that we can exit from + * parallel mode, update the index statistics and then destroy parallel + * context, but that won't be safe (see ExitParallelMode). + */ +void +parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) +{ + Assert(!IsParallelWorker()); + + /* Copy the updated statistics */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->istat_updated) + { + istats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); + memcpy(istats[i], &indstats->istat, sizeof(IndexBulkDeleteResult)); + } + else + istats[i] = NULL; + } + + DestroyParallelContext(pvs->pcxt); + ExitParallelMode(); + + pfree(pvs->will_parallel_vacuum); + pfree(pvs); +} + +/* Returns the dead items space */ +VacDeadItems * +parallel_vacuum_get_dead_items(ParallelVacuumState *pvs) +{ + return pvs->dead_items; +} + +/* + * Do parallel index bulk-deletion with parallel workers. + */ +void +parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans) +{ + Assert(!IsParallelWorker()); + + /* + * We can only provide an approximate value of num_heap_tuples, at least + * for now. + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = true; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, true); +} + +/* + * Do parallel index cleanup with parallel workers. + */ +void +parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, + int num_index_scans, bool estimated_count) +{ + Assert(!IsParallelWorker()); + + /* + * We can provide a better estimate of total number of surviving tuples + * (we assume indexes are more interested in that than in the number of + * nominally live tuples). + */ + pvs->shared->reltuples = num_table_tuples; + pvs->shared->estimated_count = estimated_count; + + parallel_vacuum_process_all_indexes(pvs, num_index_scans, false); +} + +/* + * Compute the number of parallel worker processes to request. Both index + * vacuum and index cleanup can be executed with parallel workers. + * The index is eligible for parallel vacuum iff its size is greater than + * min_parallel_index_scan_size as invoking workers for very small indexes + * can hurt performance. + * + * nrequested is the number of parallel workers that user requested. If + * nrequested is 0, we compute the parallel degree based on nindexes, that is + * the number of indexes that support parallel vacuum. This function also + * sets will_parallel_vacuum to remember indexes that participate in parallel + * vacuum. + */ +static int +parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, + bool *will_parallel_vacuum) +{ + int nindexes_parallel = 0; + int nindexes_parallel_bulkdel = 0; + int nindexes_parallel_cleanup = 0; + int parallel_workers; + + /* + * We don't allow performing parallel operation in standalone backend or + * when parallelism is disabled. + */ + if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) + return 0; + + /* + * Compute the number of indexes that can participate in parallel vacuum. + */ + for (int i = 0; i < nindexes; i++) + { + Relation indrel = indrels[i]; + uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* Skip index that is not a suitable target for parallel index vacuum */ + if (vacoptions == VACUUM_OPTION_NO_PARALLEL || + RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) + continue; + + will_parallel_vacuum[i] = true; + + if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) + nindexes_parallel_bulkdel++; + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) || + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + nindexes_parallel_cleanup++; + } + + nindexes_parallel = Max(nindexes_parallel_bulkdel, + nindexes_parallel_cleanup); + + /* The leader process takes one index */ + nindexes_parallel--; + + /* No index supports parallel vacuum */ + if (nindexes_parallel <= 0) + return 0; + + /* Compute the parallel degree */ + parallel_workers = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; + + /* Cap by max_parallel_maintenance_workers */ + parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + + return parallel_workers; +} + +/* + * Perform index vacuum or index cleanup with parallel workers. This function + * must be used by the parallel vacuum leader process. + */ +static void +parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, + bool vacuum) +{ + int nworkers; + PVIndVacStatus new_status; + + Assert(!IsParallelWorker()); + + if (vacuum) + { + new_status = PARALLEL_INDVAC_STATUS_NEED_BULKDELETE; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_bulkdel; + } + else + { + new_status = PARALLEL_INDVAC_STATUS_NEED_CLEANUP; + + /* Determine the number of parallel workers to launch */ + nworkers = pvs->nindexes_parallel_cleanup; + + /* Add conditionally parallel-aware indexes if in the first time call */ + if (num_index_scans == 0) + nworkers += pvs->nindexes_parallel_condcleanup; + } + + /* The leader process will participate */ + nworkers--; + + /* + * It is possible that parallel context is initialized with fewer workers + * than the number of indexes that need a separate worker in the current + * phase, so we need to consider it. See + * parallel_vacuum_compute_workers(). + */ + nworkers = Min(nworkers, pvs->pcxt->nworkers); + + /* + * Set index vacuum status and mark whether parallel vacuum worker can + * process it. + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL); + indstats->status = new_status; + indstats->parallel_workers_can_process = + (pvs->will_parallel_vacuum[i] & + parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], + num_index_scans, + vacuum)); + } + + /* Reset the parallel index processing counter */ + pg_atomic_write_u32(&(pvs->shared->idx), 0); + + /* Setup the shared cost-based vacuum delay and launch workers */ + if (nworkers > 0) + { + /* Reinitialize parallel context to relaunch parallel workers */ + if (num_index_scans > 0) + ReinitializeParallelDSM(pvs->pcxt); + + /* + * Set up shared cost balance and the number of active workers for + * vacuum delay. We need to do this before launching workers as + * otherwise, they might not see the updated values for these + * parameters. + */ + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup + * phase. + */ + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + + LaunchParallelWorkers(pvs->pcxt); + + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have + * already accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } + + if (vacuum) + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)", + "launched %d parallel vacuum workers for index vacuuming (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + else + ereport(pvs->shared->elevel, + (errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)", + "launched %d parallel vacuum workers for index cleanup (planned: %d)", + pvs->pcxt->nworkers_launched), + pvs->pcxt->nworkers_launched, nworkers))); + } + + /* Vacuum the indexes that can be processed by only leader process */ + parallel_vacuum_process_unsafe_indexes(pvs); + + /* + * Join as a parallel worker. The leader vacuums alone processes all + * parallel-safe indexes in the case where no workers are launched. + */ + parallel_vacuum_process_safe_indexes(pvs); + + /* + * Next, accumulate buffer and WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) + */ + if (nworkers > 0) + { + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + } + + /* + * Reset all index status back to initial (while checking that we have + * vacuumed all indexes). + */ + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + if (indstats->status != PARALLEL_INDVAC_STATUS_COMPLETED) + elog(ERROR, "parallel index vacuum on index \"%s\" is not completed", + RelationGetRelationName(pvs->indrels[i])); + + indstats->status = PARALLEL_INDVAC_STATUS_INITIAL; + } + + /* + * Carry the shared balance value to heap scan and disable shared costing + */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } +} + +/* + * Index vacuum/cleanup routine used by the leader process and parallel + * vacuum worker processes to vacuum the indexes in parallel. + */ +static void +parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs) +{ + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Loop until all indexes are vacuumed */ + for (;;) + { + int idx; + PVIndStats *indstats; + + /* Get an index number to process */ + idx = pg_atomic_fetch_add_u32(&(pvs->shared->idx), 1); + + /* Done for all indexes? */ + if (idx >= pvs->nindexes) + break; + + indstats = &(pvs->indstats[idx]); + + /* + * Skip vacuuming index that is unsafe for workers or has an + * unsuitable target for parallel index vacuum (this is vacuumed in + * parallel_vacuum_process_unsafe_indexes() by the leader). + */ + if (!indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[idx], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Perform parallel vacuuming of indexes in leader process. + * + * Handles index vacuuming (or index cleanup) for indexes that are not + * parallel safe. It's possible that this will vary for a given index, based + * on details like whether we're performing index cleanup right now. + * + * Also performs vacuuming of smaller indexes that fell under the size cutoff + * enforced by parallel_vacuum_compute_workers(). + */ +static void +parallel_vacuum_process_unsafe_indexes(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + /* + * Increment the active worker count if we are able to launch any worker. + */ + if (VacuumActiveNWorkers) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + for (int i = 0; i < pvs->nindexes; i++) + { + PVIndStats *indstats = &(pvs->indstats[i]); + + /* Skip, indexes that are safe for workers */ + if (indstats->parallel_workers_can_process) + continue; + + /* Do vacuum or cleanup of the index */ + parallel_vacuum_process_one_index(pvs, pvs->indrels[i], indstats); + } + + /* + * We have completed the index vacuum so decrement the active worker + * count. + */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * Vacuum or cleanup index either by leader process or by one of the worker + * process. After vacuuming the index this function copies the index + * statistics returned from ambulkdelete and amvacuumcleanup to the DSM + * segment. + */ +static void +parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, + PVIndStats *indstats) +{ + IndexBulkDeleteResult *istat = NULL; + IndexBulkDeleteResult *istat_res; + IndexVacuumInfo ivinfo; + + /* + * Update the pointer to the corresponding bulk-deletion result if someone + * has already updated it + */ + if (indstats->istat_updated) + istat = &(indstats->istat); + + ivinfo.index = indrel; + ivinfo.analyze_only = false; + ivinfo.report_progress = false; + ivinfo.message_level = pvs->shared->elevel; + ivinfo.estimated_count = pvs->shared->estimated_count; + ivinfo.num_heap_tuples = pvs->shared->reltuples; + ivinfo.strategy = pvs->bstrategy; + + /* Update error traceback information */ + pvs->indname = pstrdup(RelationGetRelationName(indrel)); + pvs->status = indstats->status; + + switch (indstats->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + istat_res = vac_cleanup_one_index(&ivinfo, istat); + break; + default: + elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", + indstats->status, + RelationGetRelationName(indrel)); + } + + /* + * Copy the index bulk-deletion result returned from ambulkdelete and + * amvacuumcleanup to the DSM segment if it's the first cycle because they + * allocate locally and it's possible that an index will be vacuumed by a + * different vacuum process the next cycle. Copying the result normally + * happens only the first time an index is vacuumed. For any additional + * vacuum pass, we directly point to the result on the DSM segment and + * pass it to vacuum index APIs so that workers can update it directly. + * + * Since all vacuum workers write the bulk-deletion result at different + * slots we can write them without locking. + */ + if (!indstats->istat_updated && istat_res != NULL) + { + memcpy(&(indstats->istat), istat_res, sizeof(IndexBulkDeleteResult)); + indstats->istat_updated = true; + + /* Free the locally-allocated bulk-deletion result */ + pfree(istat_res); + } + + /* + * Update the status to completed. No need to lock here since each worker + * touches different indexes. + */ + indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; + + /* Reset error traceback information */ + pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; + pfree(pvs->indname); + pvs->indname = NULL; +} + +/* + * Returns false, if the given index can't participate in the next execution of + * parallel index vacuum or parallel index cleanup. + */ +static bool +parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, + bool vacuum) +{ + uint8 vacoptions; + + vacoptions = indrel->rd_indam->amparallelvacuumoptions; + + /* In parallel vacuum case, check if it supports parallel bulk-deletion */ + if (vacuum) + return ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0); + + /* Not safe, if the index does not support parallel cleanup */ + if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)) + return false; + + /* + * Not safe, if the index supports parallel cleanup conditionally, but we + * have already processed the index (for bulkdelete). We do this to avoid + * the need to invoke workers when parallel index cleanup doesn't need to + * scan the index. See the comments for option + * VACUUM_OPTION_PARALLEL_COND_CLEANUP to know when indexes support + * parallel cleanup conditionally. + */ + if (num_index_scans > 0 && + ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)) + return false; + + return true; +} + +/* + * Perform work within a launched parallel process. + * + * Since parallel vacuum workers perform only index vacuum or index cleanup, + * we don't need to report progress information. + */ +void +parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelVacuumState pvs; + Relation rel; + Relation *indrels; + PVIndStats *indstats; + PVShared *shared; + VacDeadItems *dead_items; + BufferUsage *buffer_usage; + WalUsage *wal_usage; + int nindexes; + char *sharedquery; + ErrorContextCallback errcallback; + + /* + * A parallel vacuum worker must have only PROC_IN_VACUUM flag since we + * don't support parallel vacuum for autovacuum as of now. + */ + Assert(MyProc->statusFlags == PROC_IN_VACUUM); + + elog(DEBUG1, "starting parallel vacuum worker"); + + shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); + + /* Set debug_query_string for individual workers */ + sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); + debug_query_string = sharedquery; + pgstat_report_activity(STATE_RUNNING, debug_query_string); + + /* + * Open table. The lock mode is the same as the leader process. It's + * okay because the lock mode does not conflict among the parallel + * workers. + */ + rel = table_open(shared->relid, ShareUpdateExclusiveLock); + + /* + * Open all indexes. indrels are sorted in order by OID, which should be + * matched to the leader's one. + */ + vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); + Assert(nindexes > 0); + + if (shared->maintenance_work_mem_worker > 0) + maintenance_work_mem = shared->maintenance_work_mem_worker; + + /* Set index statistics */ + indstats = (PVIndStats *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_INDEX_STATS, + false); + + /* Set dead_items space */ + dead_items = (VacDeadItems *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_DEAD_ITEMS, + false); + + /* Set cost-based vacuum delay */ + VacuumCostActive = (VacuumCostDelay > 0); + VacuumCostBalance = 0; + VacuumPageHit = 0; + VacuumPageMiss = 0; + VacuumPageDirty = 0; + VacuumCostBalanceLocal = 0; + VacuumSharedCostBalance = &(shared->cost_balance); + VacuumActiveNWorkers = &(shared->active_nworkers); + + /* Set parallel vacuum state */ + pvs.indrels = indrels; + pvs.nindexes = nindexes; + pvs.indstats = indstats; + pvs.shared = shared; + pvs.dead_items = dead_items; + pvs.relnamespace = get_namespace_name(RelationGetNamespace(rel)); + pvs.relname = pstrdup(RelationGetRelationName(rel)); + + /* These fields will be filled during index vacuum or cleanup */ + pvs.indname = NULL; + pvs.status = PARALLEL_INDVAC_STATUS_INITIAL; + + /* Each parallel VACUUM worker gets its own access strategy */ + pvs.bstrategy = GetAccessStrategy(BAS_VACUUM); + + /* Setup error traceback support for ereport() */ + errcallback.callback = parallel_vacuum_error_callback; + errcallback.arg = &pvs; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* Prepare to track buffer usage during parallel execution */ + InstrStartParallelQuery(); + + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + + /* Report buffer/WAL usage during parallel execution */ + buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); + wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; + + vac_close_indexes(nindexes, indrels, RowExclusiveLock); + table_close(rel, ShareUpdateExclusiveLock); + FreeAccessStrategy(pvs.bstrategy); +} + +/* + * Error context callback for errors occurring during parallel index vacuum. + * The error context messages should match the messages set in the lazy vacuum + * error context. If you change this function, change vacuum_error_callback() + * as well. + */ +static void +parallel_vacuum_error_callback(void *arg) +{ + ParallelVacuumState *errinfo = arg; + + switch (errinfo->status) + { + case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + errcontext("while vacuuming index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + errcontext("while cleaning up index \"%s\" of relation \"%s.%s\"", + errinfo->indname, + errinfo->relnamespace, + errinfo->relname); + break; + case PARALLEL_INDVAC_STATUS_INITIAL: + case PARALLEL_INDVAC_STATUS_COMPLETED: + default: + return; + } +} diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 417dd288e5..f3fb1e93a5 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -198,7 +198,6 @@ extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets); struct VacuumParams; extern void heap_vacuum_rel(Relation rel, struct VacuumParams *params, BufferAccessStrategy bstrategy); -extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in heap/heapam_visibility.c */ extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot, diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 97bffa8ff1..5a36049be6 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -16,6 +16,7 @@ #include "access/htup.h" #include "access/genam.h" +#include "access/parallel.h" #include "catalog/pg_class.h" #include "catalog/pg_statistic.h" #include "catalog/pg_type.h" @@ -63,6 +64,9 @@ /* value for checking vacuum flags */ #define VACUUM_OPTION_MAX_VALID_VALUE ((1 << 3) - 1) +/* Abstract type for parallel vacuum state */ +typedef struct ParallelVacuumState ParallelVacuumState; + /*---------- * ANALYZE builds one of these structs for each attribute (column) that is * to be analyzed. The struct and subsidiary data are in anl_context, @@ -305,6 +309,22 @@ extern IndexBulkDeleteResult *vac_cleanup_one_index(IndexVacuumInfo *ivinfo, IndexBulkDeleteResult *istat); extern Size vac_max_items_to_alloc_size(int max_items); +/* in commands/vacuumparallel.c */ +extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels, + int nindexes, int nrequested_workers, + int max_items, int elevel, + BufferAccessStrategy bstrategy); +extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); +extern VacDeadItems *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs); +extern void parallel_vacuum_bulkdel_all_indexes(ParallelVacuumState *pvs, + long num_table_tuples, + int num_index_scans); +extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, + long num_table_tuples, + int num_index_scans, + bool estimated_count); +extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); + /* in commands/analyze.c */ extern void analyze_rel(Oid relid, RangeVar *relation, VacuumParams *params, List *va_cols, bool in_outer_xact, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9863508791..f093605472 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1306,13 +1306,8 @@ LPWSTR LSEG LUID LVPagePruneState -LVParallelIndStats -LVParallelIndVacStatus -LVParallelState LVRelState LVSavedErrInfo -LVShared -LVSharedIndStats LWLock LWLockHandle LWLockMode @@ -1775,7 +1770,10 @@ PTIterationArray PTOKEN_PRIVILEGES PTOKEN_USER PUTENVPROC +PVIndStats +PvIndVacStatus PVOID +PVShared PX_Alias PX_Cipher PX_Combo @@ -1809,6 +1807,7 @@ ParallelSlotResultHandler ParallelState ParallelTableScanDesc ParallelTableScanDescData +ParallelVacuumState ParallelWorkerContext ParallelWorkerInfo Param