From 9bfd44bbde4261181bf94738f3b041c629c65a7e Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Tue, 19 Sep 2023 14:31:29 -0700 Subject: [PATCH] Convert pg_restore's ready_list to a priority queue. Presently, parallel restores spend a lot of time sorting this list so that we pick the largest items first. With many tables, this sorting can become a significant bottleneck. There are a couple of reports from the field about this, and it is easy to reproduce. This commit improves the performance of parallel pg_restore with many tables by converting its ready_list to a priority queue, i.e., a binary heap. We will first try to run the highest priority item, but if it cannot be chosen due to the lock heuristic, we'll do a sequential scan through the heap nodes until we find one that is runnable. This means that we might end up picking an item with a much lower priority. However, we expect that we will typically be able to pick one of the first few items, which should usually have a relatively high priority. Suggested-by: Tom Lane Tested-by: Pierre Ducroquet Reviewed-by: Tom Lane Discussion: https://postgr.es/m/3612876.1689443232%40sss.pgh.pa.us --- src/bin/pg_dump/pg_backup_archiver.c | 198 ++++++++------------------- 1 file changed, 58 insertions(+), 140 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 4d83381d84..cc06f1c817 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -34,6 +34,7 @@ #include "compress_io.h" #include "dumputils.h" #include "fe_utils/string_utils.h" +#include "lib/binaryheap.h" #include "lib/stringinfo.h" #include "libpq/libpq-fs.h" #include "parallel.h" @@ -44,24 +45,6 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" -/* - * State for tracking TocEntrys that are ready to process during a parallel - * restore. (This used to be a list, and we still call it that, though now - * it's really an array so that we can apply qsort to it.) - * - * tes[] is sized large enough that we can't overrun it. - * The valid entries are indexed first_te .. last_te inclusive. - * We periodically sort the array to bring larger-by-dataLength entries to - * the front; "sorted" is true if the valid entries are known sorted. - */ -typedef struct _parallelReadyList -{ - TocEntry **tes; /* Ready-to-dump TocEntrys */ - int first_te; /* index of first valid entry in tes[] */ - int last_te; /* index of last valid entry in tes[] */ - bool sorted; /* are valid entries currently sorted? */ -} ParallelReadyList; - static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, @@ -111,16 +94,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH, static void pending_list_header_init(TocEntry *l); static void pending_list_append(TocEntry *l, TocEntry *te); static void pending_list_remove(TocEntry *te); -static void ready_list_init(ParallelReadyList *ready_list, int tocCount); -static void ready_list_free(ParallelReadyList *ready_list); -static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); -static void ready_list_remove(ParallelReadyList *ready_list, int i); -static void ready_list_sort(ParallelReadyList *ready_list); -static int TocEntrySizeCompare(const void *p1, const void *p2); -static void move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +static int TocEntrySizeCompareQsort(const void *p1, const void *p2); +static int TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg); +static void move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass); -static TocEntry *pop_next_work_item(ParallelReadyList *ready_list, +static TocEntry *pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -135,7 +114,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list); + binaryheap *ready_heap); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -2384,7 +2363,7 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) } if (ntes > 1) - qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompare); + qsort(tes, ntes, sizeof(TocEntry *), TocEntrySizeCompareQsort); for (int i = 0; i < ntes; i++) DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, @@ -3984,7 +3963,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) (void) restore_toc_entry(AH, next_work_item, false); - /* Reduce dependencies, but don't move anything to ready_list */ + /* Reduce dependencies, but don't move anything to ready_heap */ reduce_dependencies(AH, next_work_item, NULL); } else @@ -4027,24 +4006,26 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - ParallelReadyList ready_list; + binaryheap *ready_heap; TocEntry *next_work_item; pg_log_debug("entering restore_toc_entries_parallel"); - /* Set up ready_list with enough room for all known TocEntrys */ - ready_list_init(&ready_list, AH->tocCount); + /* Set up ready_heap with enough room for all known TocEntrys */ + ready_heap = binaryheap_allocate(AH->tocCount, + TocEntrySizeCompareBinaryheap, + NULL); /* * The pending_list contains all items that we need to restore. Move all - * items that are available to process immediately into the ready_list. + * items that are available to process immediately into the ready_heap. * After this setup, the pending list is everything that needs to be done - * but is blocked by one or more dependencies, while the ready list + * but is blocked by one or more dependencies, while the ready heap * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ AH->restorePass = RESTORE_PASS_MAIN; - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* * main parent loop @@ -4058,7 +4039,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = pop_next_work_item(&ready_list, pstate); + next_work_item = pop_next_work_item(ready_heap, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4068,7 +4049,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); /* Update its dependencies as though we'd completed it */ - reduce_dependencies(AH, next_work_item, &ready_list); + reduce_dependencies(AH, next_work_item, ready_heap); /* Loop around to see if anything else can be dispatched */ continue; } @@ -4079,7 +4060,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, - mark_restore_job_done, &ready_list); + mark_restore_job_done, ready_heap); } else if (IsEveryWorkerIdle(pstate)) { @@ -4093,7 +4074,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, /* Advance to next restore pass */ AH->restorePass++; /* That probably allows some stuff to be made ready */ - move_to_ready_list(pending_list, &ready_list, AH->restorePass); + move_to_ready_heap(pending_list, ready_heap, AH->restorePass); /* Loop around to see if anything's now ready */ continue; } @@ -4122,10 +4103,10 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS); } - /* There should now be nothing in ready_list. */ - Assert(ready_list.first_te > ready_list.last_te); + /* There should now be nothing in ready_heap. */ + Assert(binaryheap_empty(ready_heap)); - ready_list_free(&ready_list); + binaryheap_free(ready_heap); pg_log_info("finished main parallel loop"); } @@ -4225,80 +4206,9 @@ pending_list_remove(TocEntry *te) } -/* - * Initialize the ready_list with enough room for up to tocCount entries. - */ -static void -ready_list_init(ParallelReadyList *ready_list, int tocCount) -{ - ready_list->tes = (TocEntry **) - pg_malloc(tocCount * sizeof(TocEntry *)); - ready_list->first_te = 0; - ready_list->last_te = -1; - ready_list->sorted = false; -} - -/* - * Free storage for a ready_list. - */ -static void -ready_list_free(ParallelReadyList *ready_list) -{ - pg_free(ready_list->tes); -} - -/* Add te to the ready_list */ -static void -ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) -{ - ready_list->tes[++ready_list->last_te] = te; - /* List is (probably) not sorted anymore. */ - ready_list->sorted = false; -} - -/* Remove the i'th entry in the ready_list */ -static void -ready_list_remove(ParallelReadyList *ready_list, int i) -{ - int f = ready_list->first_te; - - Assert(i >= f && i <= ready_list->last_te); - - /* - * In the typical case where the item to be removed is the first ready - * entry, we need only increment first_te to remove it. Otherwise, move - * the entries before it to compact the list. (This preserves sortedness, - * if any.) We could alternatively move the entries after i, but there - * are typically many more of those. - */ - if (i > f) - { - TocEntry **first_te_ptr = &ready_list->tes[f]; - - memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); - } - ready_list->first_te++; -} - -/* Sort the ready_list into the desired order */ -static void -ready_list_sort(ParallelReadyList *ready_list) -{ - if (!ready_list->sorted) - { - int n = ready_list->last_te - ready_list->first_te + 1; - - if (n > 1) - qsort(ready_list->tes + ready_list->first_te, n, - sizeof(TocEntry *), - TocEntrySizeCompare); - ready_list->sorted = true; - } -} - /* qsort comparator for sorting TocEntries by dataLength */ static int -TocEntrySizeCompare(const void *p1, const void *p2) +TocEntrySizeCompareQsort(const void *p1, const void *p2) { const TocEntry *te1 = *(const TocEntry *const *) p1; const TocEntry *te2 = *(const TocEntry *const *) p2; @@ -4318,17 +4228,25 @@ TocEntrySizeCompare(const void *p1, const void *p2) return 0; } +/* binaryheap comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompareBinaryheap(void *p1, void *p2, void *arg) +{ + /* return opposite of qsort comparator for max-heap */ + return -TocEntrySizeCompareQsort(&p1, &p2); +} + /* - * Move all immediately-ready items from pending_list to ready_list. + * Move all immediately-ready items from pending_list to ready_heap. * * Items are considered ready if they have no remaining dependencies and * they belong in the current restore pass. (See also reduce_dependencies, * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, - ParallelReadyList *ready_list, +move_to_ready_heap(TocEntry *pending_list, + binaryheap *ready_heap, RestorePass pass) { TocEntry *te; @@ -4344,38 +4262,38 @@ move_to_ready_list(TocEntry *pending_list, { /* Remove it from pending_list ... */ pending_list_remove(te); - /* ... and add to ready_list */ - ready_list_insert(ready_list, te); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, te); } } } /* * Find the next work item (if any) that is capable of being run now, - * and remove it from the ready_list. + * and remove it from the ready_heap. * * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with - * items currently running. Items in the ready_list are known to have + * items currently running. Items in the ready_heap are known to have * no remaining dependencies, but we have to check for lock conflicts. */ static TocEntry * -pop_next_work_item(ParallelReadyList *ready_list, +pop_next_work_item(binaryheap *ready_heap, ParallelState *pstate) { /* - * Sort the ready_list so that we'll tackle larger jobs first. + * Search the ready_heap until we find a suitable item. Note that we do a + * sequential scan through the heap nodes, so even though we will first + * try to choose the highest-priority item, we might end up picking + * something with a much lower priority. However, we expect that we will + * typically be able to pick one of the first few items, which should + * usually have a relatively high priority. */ - ready_list_sort(ready_list); - - /* - * Search the ready_list until we find a suitable item. - */ - for (int i = ready_list->first_te; i <= ready_list->last_te; i++) + for (int i = 0; i < binaryheap_size(ready_heap); i++) { - TocEntry *te = ready_list->tes[i]; + TocEntry *te = (TocEntry *) binaryheap_get_node(ready_heap, i); bool conflicts = false; /* @@ -4401,7 +4319,7 @@ pop_next_work_item(ParallelReadyList *ready_list, continue; /* passed all tests, so this item can run */ - ready_list_remove(ready_list, i); + binaryheap_remove_node(ready_heap, i); return te; } @@ -4447,7 +4365,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; + binaryheap *ready_heap = (binaryheap *) callback_data; pg_log_info("finished item %d %s %s", te->dumpId, te->desc, te->tag); @@ -4465,7 +4383,7 @@ mark_restore_job_done(ArchiveHandle *AH, pg_fatal("worker process failed: exit code %d", status); - reduce_dependencies(AH, te, ready_list); + reduce_dependencies(AH, te, ready_heap); } @@ -4708,11 +4626,11 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) /* * Remove the specified TOC entry from the depCounts of items that depend on * it, thereby possibly making them ready-to-run. Any pending item that - * becomes ready should be moved to the ready_list, if that's provided. + * becomes ready should be moved to the ready_heap, if that's provided. */ static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - ParallelReadyList *ready_list) + binaryheap *ready_heap) { int i; @@ -4730,18 +4648,18 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, * the current restore pass, and it is currently a member of the * pending list (that check is needed to prevent double restore in * some cases where a list-file forces out-of-order restoring). - * However, if ready_list == NULL then caller doesn't want any list + * However, if ready_heap == NULL then caller doesn't want any list * memberships changed. */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && otherte->pending_prev != NULL && - ready_list != NULL) + ready_heap != NULL) { /* Remove it from pending list ... */ pending_list_remove(otherte); - /* ... and add to ready_list */ - ready_list_insert(ready_list, otherte); + /* ... and add to ready_heap */ + binaryheap_add(ready_heap, otherte); } } }