diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 42cf441aaf..ba798213be 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -252,18 +252,6 @@ extern void ConnectDatabase(Archive *AH, extern void DisconnectDatabase(Archive *AHX); extern PGconn *GetConnection(Archive *AHX); -/* Called to add a TOC entry */ -extern void ArchiveEntry(Archive *AHX, - CatalogId catalogId, DumpId dumpId, - const char *tag, - const char *namespace, const char *tablespace, - const char *owner, bool withOids, - const char *desc, teSection section, - const char *defn, - const char *dropStmt, const char *copyStmt, - const DumpId *deps, int nDeps, - DataDumperPtr dumpFn, void *dumpArg); - /* Called to write *data* to the archive */ extern void WriteData(Archive *AH, const void *data, size_t dLen); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 36e3383b85..3f7a658bce 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -49,6 +49,24 @@ typedef struct _outputContext int gzOut; } OutputContext; +/* + * State for tracking TocEntrys that are ready to process during a parallel + * restore. (This used to be a list, and we still call it that, though now + * it's really an array so that we can apply qsort to it.) + * + * tes[] is sized large enough that we can't overrun it. + * The valid entries are indexed first_te .. last_te inclusive. + * We periodically sort the array to bring larger-by-dataLength entries to + * the front; "sorted" is true if the valid entries are known sorted. + */ +typedef struct _parallelReadyList +{ + TocEntry **tes; /* Ready-to-dump TocEntrys */ + int first_te; /* index of first valid entry in tes[] */ + int last_te; /* index of last valid entry in tes[] */ + bool sorted; /* are valid entries currently sorted? */ +} ParallelReadyList; + /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver"); @@ -95,13 +113,20 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, TocEntry *pending_list); static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); -static void par_list_header_init(TocEntry *l); -static void par_list_append(TocEntry *l, TocEntry *te); -static void par_list_remove(TocEntry *te); -static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +static void pending_list_header_init(TocEntry *l); +static void pending_list_append(TocEntry *l, TocEntry *te); +static void pending_list_remove(TocEntry *te); +static void ready_list_init(ParallelReadyList *ready_list, int tocCount); +static void ready_list_free(ParallelReadyList *ready_list); +static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te); +static void ready_list_remove(ParallelReadyList *ready_list, int i); +static void ready_list_sort(ParallelReadyList *ready_list); +static int TocEntrySizeCompare(const void *p1, const void *p2); +static void move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass); -static TocEntry *get_next_work_item(ArchiveHandle *AH, - TocEntry *ready_list, +static TocEntry *pop_next_work_item(ArchiveHandle *AH, + ParallelReadyList *ready_list, ParallelState *pstate); static void mark_dump_job_done(ArchiveHandle *AH, TocEntry *te, @@ -116,7 +141,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2); static void repoint_table_dependencies(ArchiveHandle *AH); static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te); static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te, - TocEntry *ready_list); + ParallelReadyList *ready_list); static void mark_create_done(ArchiveHandle *AH, TocEntry *te); static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te); @@ -639,7 +664,11 @@ RestoreArchive(Archive *AHX) ParallelState *pstate; TocEntry pending_list; - par_list_header_init(&pending_list); + /* The archive format module may need some setup for this */ + if (AH->PrepParallelRestorePtr) + AH->PrepParallelRestorePtr(AH); + + pending_list_header_init(&pending_list); /* This runs PRE_DATA items and then disconnects from the database */ restore_toc_entries_prefork(AH, &pending_list); @@ -1039,10 +1068,14 @@ WriteData(Archive *AHX, const void *data, size_t dLen) /* * Create a new TOC entry. The TOC was designed as a TOC, but is now the * repository for all metadata. But the name has stuck. + * + * The new entry is added to the Archive's TOC list. Most callers can ignore + * the result value because nothing else need be done, but a few want to + * manipulate the TOC entry further. */ /* Public */ -void +TocEntry * ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, const char *tag, @@ -1100,9 +1133,12 @@ ArchiveEntry(Archive *AHX, newToc->hadDumper = dumpFn ? true : false; newToc->formatData = NULL; + newToc->dataLength = 0; if (AH->ArchiveEntryPtr != NULL) AH->ArchiveEntryPtr(AH, newToc); + + return newToc; } /* Public */ @@ -2413,32 +2449,59 @@ WriteDataChunks(ArchiveHandle *AH, ParallelState *pstate) { TocEntry *te; - for (te = AH->toc->next; te != AH->toc; te = te->next) - { - if (!te->dataDumper) - continue; - - if ((te->reqs & REQ_DATA) == 0) - continue; - - if (pstate && pstate->numWorkers > 1) - { - /* - * If we are in a parallel backup, then we are always the master - * process. Dispatch each data-transfer job to a worker. - */ - DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP, - mark_dump_job_done, NULL); - } - else - WriteDataChunksForTocEntry(AH, te); - } - - /* - * If parallel, wait for workers to finish. - */ if (pstate && pstate->numWorkers > 1) + { + /* + * In parallel mode, this code runs in the master process. We + * construct an array of candidate TEs, then sort it into decreasing + * size order, then dispatch each TE to a data-transfer worker. By + * dumping larger tables first, we avoid getting into a situation + * where we're down to one job and it's big, losing parallelism. + */ + TocEntry **tes; + int ntes; + + tes = (TocEntry **) pg_malloc(AH->tocCount * sizeof(TocEntry *)); + ntes = 0; + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + /* Consider only TEs with dataDumper functions ... */ + if (!te->dataDumper) + continue; + /* ... and ignore ones not enabled for dump */ + if ((te->reqs & REQ_DATA) == 0) + continue; + + tes[ntes++] = te; + } + + if (ntes > 1) + qsort((void *) tes, ntes, sizeof(TocEntry *), + TocEntrySizeCompare); + + for (int i = 0; i < ntes; i++) + DispatchJobForTocEntry(AH, pstate, tes[i], ACT_DUMP, + mark_dump_job_done, NULL); + + pg_free(tes); + + /* Now wait for workers to finish. */ WaitForWorkers(AH, pstate, WFW_ALL_IDLE); + } + else + { + /* Non-parallel mode: just dump all candidate TEs sequentially. */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + /* Must have same filter conditions as above */ + if (!te->dataDumper) + continue; + if ((te->reqs & REQ_DATA) == 0) + continue; + + WriteDataChunksForTocEntry(AH, te); + } + } } @@ -2690,6 +2753,7 @@ ReadToc(ArchiveHandle *AH) te->dependencies = NULL; te->nDeps = 0; } + te->dataLength = 0; if (AH->ReadExtraTocPtr) AH->ReadExtraTocPtr(AH, te); @@ -3996,7 +4060,7 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) else { /* Nope, so add it to pending_list */ - par_list_append(pending_list, next_work_item); + pending_list_append(pending_list, next_work_item); } } @@ -4035,11 +4099,14 @@ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - TocEntry ready_list; + ParallelReadyList ready_list; TocEntry *next_work_item; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); + /* Set up ready_list with enough room for all known TocEntrys */ + ready_list_init(&ready_list, AH->tocCount); + /* * The pending_list contains all items that we need to restore. Move all * items that are available to process immediately into the ready_list. @@ -4048,7 +4115,6 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, * contains items that have no remaining dependencies and are OK to * process in the current restore pass. */ - par_list_header_init(&ready_list); AH->restorePass = RESTORE_PASS_MAIN; move_to_ready_list(pending_list, &ready_list, AH->restorePass); @@ -4064,7 +4130,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, for (;;) { /* Look for an item ready to be dispatched to a worker */ - next_work_item = get_next_work_item(AH, &ready_list, pstate); + next_work_item = pop_next_work_item(AH, &ready_list, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ @@ -4073,8 +4139,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, ahlog(AH, 1, "skipping item %d %s %s\n", next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Drop it from ready_list, and update its dependencies */ - par_list_remove(next_work_item); + /* Update its dependencies as though we'd completed it */ reduce_dependencies(AH, next_work_item, &ready_list); /* Loop around to see if anything else can be dispatched */ continue; @@ -4084,9 +4149,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - /* Remove it from ready_list, and dispatch to some worker */ - par_list_remove(next_work_item); - + /* Dispatch to some worker */ DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE, mark_restore_job_done, &ready_list); } @@ -4132,7 +4195,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, } /* There should now be nothing in ready_list. */ - Assert(ready_list.par_next == &ready_list); + Assert(ready_list.first_te > ready_list.last_te); + + ready_list_free(&ready_list); ahlog(AH, 1, "finished main parallel loop\n"); } @@ -4170,7 +4235,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) * connection. We don't sweat about RestorePass ordering; it's likely we * already violated that. */ - for (te = pending_list->par_next; te != pending_list; te = te->par_next) + for (te = pending_list->pending_next; te != pending_list; te = te->pending_next) { ahlog(AH, 1, "processing missed item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4201,36 +4266,130 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2) /* - * Initialize the header of a parallel-processing list. + * Initialize the header of the pending-items list. * - * These are circular lists with a dummy TocEntry as header, just like the + * This is a circular list with a dummy TocEntry as header, just like the * main TOC list; but we use separate list links so that an entry can be in - * the main TOC list as well as in a parallel-processing list. + * the main TOC list as well as in the pending list. */ static void -par_list_header_init(TocEntry *l) +pending_list_header_init(TocEntry *l) { - l->par_prev = l->par_next = l; + l->pending_prev = l->pending_next = l; } -/* Append te to the end of the parallel-processing list headed by l */ +/* Append te to the end of the pending-list headed by l */ static void -par_list_append(TocEntry *l, TocEntry *te) +pending_list_append(TocEntry *l, TocEntry *te) { - te->par_prev = l->par_prev; - l->par_prev->par_next = te; - l->par_prev = te; - te->par_next = l; + te->pending_prev = l->pending_prev; + l->pending_prev->pending_next = te; + l->pending_prev = te; + te->pending_next = l; } -/* Remove te from whatever parallel-processing list it's in */ +/* Remove te from the pending-list */ static void -par_list_remove(TocEntry *te) +pending_list_remove(TocEntry *te) { - te->par_prev->par_next = te->par_next; - te->par_next->par_prev = te->par_prev; - te->par_prev = NULL; - te->par_next = NULL; + te->pending_prev->pending_next = te->pending_next; + te->pending_next->pending_prev = te->pending_prev; + te->pending_prev = NULL; + te->pending_next = NULL; +} + + +/* + * Initialize the ready_list with enough room for up to tocCount entries. + */ +static void +ready_list_init(ParallelReadyList *ready_list, int tocCount) +{ + ready_list->tes = (TocEntry **) + pg_malloc(tocCount * sizeof(TocEntry *)); + ready_list->first_te = 0; + ready_list->last_te = -1; + ready_list->sorted = false; +} + +/* + * Free storage for a ready_list. + */ +static void +ready_list_free(ParallelReadyList *ready_list) +{ + pg_free(ready_list->tes); +} + +/* Add te to the ready_list */ +static void +ready_list_insert(ParallelReadyList *ready_list, TocEntry *te) +{ + ready_list->tes[++ready_list->last_te] = te; + /* List is (probably) not sorted anymore. */ + ready_list->sorted = false; +} + +/* Remove the i'th entry in the ready_list */ +static void +ready_list_remove(ParallelReadyList *ready_list, int i) +{ + int f = ready_list->first_te; + + Assert(i >= f && i <= ready_list->last_te); + + /* + * In the typical case where the item to be removed is the first ready + * entry, we need only increment first_te to remove it. Otherwise, move + * the entries before it to compact the list. (This preserves sortedness, + * if any.) We could alternatively move the entries after i, but there + * are typically many more of those. + */ + if (i > f) + { + TocEntry **first_te_ptr = &ready_list->tes[f]; + + memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *)); + } + ready_list->first_te++; +} + +/* Sort the ready_list into the desired order */ +static void +ready_list_sort(ParallelReadyList *ready_list) +{ + if (!ready_list->sorted) + { + int n = ready_list->last_te - ready_list->first_te + 1; + + if (n > 1) + qsort(ready_list->tes + ready_list->first_te, n, + sizeof(TocEntry *), + TocEntrySizeCompare); + ready_list->sorted = true; + } +} + +/* qsort comparator for sorting TocEntries by dataLength */ +static int +TocEntrySizeCompare(const void *p1, const void *p2) +{ + const TocEntry *te1 = *(const TocEntry *const *) p1; + const TocEntry *te2 = *(const TocEntry *const *) p2; + + /* Sort by decreasing dataLength */ + if (te1->dataLength > te2->dataLength) + return -1; + if (te1->dataLength < te2->dataLength) + return 1; + + /* For equal dataLengths, sort by dumpId, just to be stable */ + if (te1->dumpId < te2->dumpId) + return -1; + if (te1->dumpId > te2->dumpId) + return 1; + + return 0; } @@ -4242,52 +4401,50 @@ par_list_remove(TocEntry *te) * which applies the same logic one-at-a-time.) */ static void -move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, +move_to_ready_list(TocEntry *pending_list, + ParallelReadyList *ready_list, RestorePass pass) { TocEntry *te; TocEntry *next_te; - for (te = pending_list->par_next; te != pending_list; te = next_te) + for (te = pending_list->pending_next; te != pending_list; te = next_te) { - /* must save list link before possibly moving te to other list */ - next_te = te->par_next; + /* must save list link before possibly removing te from list */ + next_te = te->pending_next; if (te->depCount == 0 && _tocEntryRestorePass(te) == pass) { /* Remove it from pending_list ... */ - par_list_remove(te); + pending_list_remove(te); /* ... and add to ready_list */ - par_list_append(ready_list, te); + ready_list_insert(ready_list, te); } } } /* - * Find the next work item (if any) that is capable of being run now. + * Find the next work item (if any) that is capable of being run now, + * and remove it from the ready_list. + * + * Returns the item, or NULL if nothing is runnable. * * To qualify, the item must have no remaining dependencies * and no requirements for locks that are incompatible with * items currently running. Items in the ready_list are known to have * no remaining dependencies, but we have to check for lock conflicts. * - * Note that the returned item has *not* been removed from ready_list. - * The caller must do that after successfully dispatching the item. - * * pref_non_data is for an alternative selection algorithm that gives * preference to non-data items if there is already a data load running. * It is currently disabled. */ static TocEntry * -get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, +pop_next_work_item(ArchiveHandle *AH, ParallelReadyList *ready_list, ParallelState *pstate) { bool pref_non_data = false; /* or get from AH->ropt */ - TocEntry *data_te = NULL; - TocEntry *te; - int i, - k; + int data_te_index = -1; /* * Bogus heuristics for pref_non_data @@ -4296,7 +4453,7 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, { int count = 0; - for (k = 0; k < pstate->numWorkers; k++) + for (int k = 0; k < pstate->numWorkers; k++) { TocEntry *running_te = pstate->te[k]; @@ -4308,11 +4465,17 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, pref_non_data = false; } + /* + * Sort the ready_list so that we'll tackle larger jobs first. + */ + ready_list_sort(ready_list); + /* * Search the ready_list until we find a suitable item. */ - for (te = ready_list->par_next; te != ready_list; te = te->par_next) + for (int i = ready_list->first_te; i <= ready_list->last_te; i++) { + TocEntry *te = ready_list->tes[i]; bool conflicts = false; /* @@ -4320,9 +4483,9 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, * that a currently running item also needs lock on, or vice versa. If * so, we don't want to schedule them together. */ - for (i = 0; i < pstate->numWorkers; i++) + for (int k = 0; k < pstate->numWorkers; k++) { - TocEntry *running_te = pstate->te[i]; + TocEntry *running_te = pstate->te[k]; if (running_te == NULL) continue; @@ -4339,17 +4502,23 @@ get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, if (pref_non_data && te->section == SECTION_DATA) { - if (data_te == NULL) - data_te = te; + if (data_te_index < 0) + data_te_index = i; continue; } /* passed all tests, so this item can run */ + ready_list_remove(ready_list, i); return te; } - if (data_te != NULL) + if (data_te_index >= 0) + { + TocEntry *data_te = ready_list->tes[data_te_index]; + + ready_list_remove(ready_list, data_te_index); return data_te; + } ahlog(AH, 2, "no item ready\n"); return NULL; @@ -4393,7 +4562,7 @@ mark_restore_job_done(ArchiveHandle *AH, int status, void *callback_data) { - TocEntry *ready_list = (TocEntry *) callback_data; + ParallelReadyList *ready_list = (ParallelReadyList *) callback_data; ahlog(AH, 1, "finished item %d %s %s\n", te->dumpId, te->desc, te->tag); @@ -4443,8 +4612,8 @@ fix_dependencies(ArchiveHandle *AH) te->depCount = te->nDeps; te->revDeps = NULL; te->nRevDeps = 0; - te->par_prev = NULL; - te->par_next = NULL; + te->pending_prev = NULL; + te->pending_next = NULL; } /* @@ -4551,6 +4720,12 @@ fix_dependencies(ArchiveHandle *AH) /* * Change dependencies on table items to depend on table data items instead, * but only in POST_DATA items. + * + * Also, for any item having such dependency(s), set its dataLength to the + * largest dataLength of the table data items it depends on. This ensures + * that parallel restore will prioritize larger jobs (index builds, FK + * constraint checks, etc) over smaller ones, avoiding situations where we + * end a restore with only one active job working on a large table. */ static void repoint_table_dependencies(ArchiveHandle *AH) @@ -4569,9 +4744,13 @@ repoint_table_dependencies(ArchiveHandle *AH) if (olddep <= AH->maxDumpId && AH->tableDataId[olddep] != 0) { - te->dependencies[i] = AH->tableDataId[olddep]; + DumpId tabledataid = AH->tableDataId[olddep]; + TocEntry *tabledatate = AH->tocsByDumpId[tabledataid]; + + te->dependencies[i] = tabledataid; + te->dataLength = Max(te->dataLength, tabledatate->dataLength); ahlog(AH, 2, "transferring dependency %d -> %d to %d\n", - te->dumpId, olddep, AH->tableDataId[olddep]); + te->dumpId, olddep, tabledataid); } } } @@ -4647,7 +4826,8 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te) * becomes ready should be moved to the ready_list, if that's provided. */ static void -reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) +reduce_dependencies(ArchiveHandle *AH, TocEntry *te, + ParallelReadyList *ready_list) { int i; @@ -4670,13 +4850,13 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) */ if (otherte->depCount == 0 && _tocEntryRestorePass(otherte) == AH->restorePass && - otherte->par_prev != NULL && + otherte->pending_prev != NULL && ready_list != NULL) { /* Remove it from pending list ... */ - par_list_remove(otherte); + pending_list_remove(otherte); /* ... and add to ready_list */ - par_list_append(ready_list, otherte); + ready_list_insert(ready_list, otherte); } } } diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 8dd1915998..26dd0442e8 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -162,12 +162,12 @@ typedef int (*WriteBytePtrType) (ArchiveHandle *AH, const int i); typedef int (*ReadBytePtrType) (ArchiveHandle *AH); typedef void (*WriteBufPtrType) (ArchiveHandle *AH, const void *c, size_t len); typedef void (*ReadBufPtrType) (ArchiveHandle *AH, void *buf, size_t len); -typedef void (*SaveArchivePtrType) (ArchiveHandle *AH); typedef void (*WriteExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te); typedef void (*ReadExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te); typedef void (*PrintExtraTocPtrType) (ArchiveHandle *AH, TocEntry *te); typedef void (*PrintTocDataPtrType) (ArchiveHandle *AH, TocEntry *te); +typedef void (*PrepParallelRestorePtrType) (ArchiveHandle *AH); typedef void (*ClonePtrType) (ArchiveHandle *AH); typedef void (*DeClonePtrType) (ArchiveHandle *AH); @@ -297,6 +297,7 @@ struct _archiveHandle WorkerJobDumpPtrType WorkerJobDumpPtr; WorkerJobRestorePtrType WorkerJobRestorePtr; + PrepParallelRestorePtrType PrepParallelRestorePtr; ClonePtrType ClonePtr; /* Clone format-specific fields */ DeClonePtrType DeClonePtr; /* Clean up cloned fields */ @@ -387,12 +388,13 @@ struct _tocEntry void *formatData; /* TOC Entry data specific to file format */ /* working state while dumping/restoring */ + pgoff_t dataLength; /* item's data size; 0 if none or unknown */ teReqs reqs; /* do we need schema and/or data of object */ bool created; /* set for DATA member if TABLE was created */ /* working state (needed only for parallel restore) */ - struct _tocEntry *par_prev; /* list links for pending/ready items; */ - struct _tocEntry *par_next; /* these are NULL if not in either list */ + struct _tocEntry *pending_prev; /* list links for pending-items list; */ + struct _tocEntry *pending_next; /* NULL if not in that list */ int depCount; /* number of dependencies not yet restored */ DumpId *revDeps; /* dumpIds of objects depending on this one */ int nRevDeps; /* number of such dependencies */ @@ -405,6 +407,18 @@ extern void on_exit_close_archive(Archive *AHX); extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4); +/* Called to add a TOC entry */ +extern TocEntry *ArchiveEntry(Archive *AHX, + CatalogId catalogId, DumpId dumpId, + const char *tag, + const char *namespace, const char *tablespace, + const char *owner, bool withOids, + const char *desc, teSection section, + const char *defn, + const char *dropStmt, const char *copyStmt, + const DumpId *deps, int nDeps, + DataDumperPtr dumpFn, void *dumpArg); + extern void WriteTOC(ArchiveHandle *AH); extern void ReadTOC(ArchiveHandle *AH); extern void WriteHead(ArchiveHandle *AH); diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index ad18a6c684..96f44e88b1 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -59,6 +59,8 @@ static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH, bool drop); + +static void _PrepParallelRestore(ArchiveHandle *AH); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); @@ -129,6 +131,8 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->StartBlobPtr = _StartBlob; AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + + AH->PrepParallelRestorePtr = _PrepParallelRestore; AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; @@ -775,6 +779,66 @@ _ReopenArchive(ArchiveHandle *AH) strerror(errno)); } +/* + * Prepare for parallel restore. + * + * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS + * TOC entries' dataLength fields with appropriate values to guide the + * ordering of restore jobs. The source of said data is format-dependent, + * as is the exact meaning of the values. + * + * A format module might also choose to do other setup here. + */ +static void +_PrepParallelRestore(ArchiveHandle *AH) +{ + lclContext *ctx = (lclContext *) AH->formatData; + TocEntry *prev_te = NULL; + lclTocEntry *prev_tctx = NULL; + TocEntry *te; + + /* + * Knowing that the data items were dumped out in TOC order, we can + * reconstruct the length of each item as the delta to the start offset of + * the next data item. + */ + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + + /* + * Ignore entries without a known data offset; if we were unable to + * seek to rewrite the TOC when creating the archive, this'll be all + * of them, and we'll end up with no size estimates. + */ + if (tctx->dataState != K_OFFSET_POS_SET) + continue; + + /* Compute previous data item's length */ + if (prev_te) + { + if (tctx->dataPos > prev_tctx->dataPos) + prev_te->dataLength = tctx->dataPos - prev_tctx->dataPos; + } + + prev_te = te; + prev_tctx = tctx; + } + + /* If OK to seek, we can determine the length of the last item */ + if (prev_te && ctx->hasSeek) + { + pgoff_t endpos; + + if (fseeko(AH->FH, 0, SEEK_END) != 0) + exit_horribly(modulename, "error during file seek: %s\n", + strerror(errno)); + endpos = ftello(AH->FH); + if (endpos > prev_tctx->dataPos) + prev_te->dataLength = endpos - prev_tctx->dataPos; + } +} + /* * Clone format-specific fields during parallel restoration. */ diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 4aabb40f59..cda90b9a2a 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -87,6 +87,7 @@ static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); static void _LoadBlobs(ArchiveHandle *AH); +static void _PrepParallelRestore(ArchiveHandle *AH); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); @@ -132,6 +133,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) AH->EndBlobPtr = _EndBlob; AH->EndBlobsPtr = _EndBlobs; + AH->PrepParallelRestorePtr = _PrepParallelRestore; AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; @@ -240,13 +242,13 @@ _ArchiveEntry(ArchiveHandle *AH, TocEntry *te) char fn[MAXPGPATH]; tctx = (lclTocEntry *) pg_malloc0(sizeof(lclTocEntry)); - if (te->dataDumper) + if (strcmp(te->desc, "BLOBS") == 0) + tctx->filename = pg_strdup("blobs.toc"); + else if (te->dataDumper) { snprintf(fn, MAXPGPATH, "%d.dat", te->dumpId); tctx->filename = pg_strdup(fn); } - else if (strcmp(te->desc, "BLOBS") == 0) - tctx->filename = pg_strdup("blobs.toc"); else tctx->filename = NULL; @@ -726,6 +728,68 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename) strcat(buf, relativeFilename); } +/* + * Prepare for parallel restore. + * + * The main thing that needs to happen here is to fill in TABLE DATA and BLOBS + * TOC entries' dataLength fields with appropriate values to guide the + * ordering of restore jobs. The source of said data is format-dependent, + * as is the exact meaning of the values. + * + * A format module might also choose to do other setup here. + */ +static void +_PrepParallelRestore(ArchiveHandle *AH) +{ + TocEntry *te; + + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + lclTocEntry *tctx = (lclTocEntry *) te->formatData; + char fname[MAXPGPATH]; + struct stat st; + + /* + * A dumpable object has set tctx->filename, any other object has not. + * (see _ArchiveEntry). + */ + if (tctx->filename == NULL) + continue; + + /* We may ignore items not due to be restored */ + if ((te->reqs & REQ_DATA) == 0) + continue; + + /* + * Stat the file and, if successful, put its size in dataLength. When + * using compression, the physical file size might not be a very good + * guide to the amount of work involved in restoring the file, but we + * only need an approximate indicator of that. + */ + setFilePath(AH, fname, tctx->filename); + + if (stat(fname, &st) == 0) + te->dataLength = st.st_size; + else + { + /* It might be compressed */ + strlcat(fname, ".gz", sizeof(fname)); + if (stat(fname, &st) == 0) + te->dataLength = st.st_size; + } + + /* + * If this is the BLOBS entry, what we stat'd was blobs.toc, which + * most likely is a lot smaller than the actual blob data. We don't + * have a cheap way to estimate how much smaller, but fortunately it + * doesn't matter too much as long as we get the blobs processed + * reasonably early. Arbitrarily scale up by a factor of 1K. + */ + if (strcmp(te->desc, "BLOBS") == 0) + te->dataLength *= 1024; + } +} + /* * Clone format-specific fields during parallel restoration. */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index f0ea83e6a9..0687a81914 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -54,6 +54,7 @@ #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" #include "libpq/libpq-fs.h" +#include "storage/block.h" #include "dumputils.h" #include "parallel.h" @@ -845,10 +846,6 @@ main(int argc, char **argv) */ sortDumpableObjectsByTypeName(dobjs, numObjs); - /* If we do a parallel dump, we want the largest tables to go first */ - if (archiveFormat == archDirectory && numWorkers > 1) - sortDataAndIndexObjectsBySize(dobjs, numObjs); - sortDumpableObjects(dobjs, numObjs, boundaryObjs[0].dumpId, boundaryObjs[1].dumpId); @@ -2156,13 +2153,28 @@ dumpTableData(Archive *fout, TableDataInfo *tdinfo) * See comments for BuildArchiveDependencies. */ if (tdinfo->dobj.dump & DUMP_COMPONENT_DATA) - ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId, - tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name, - NULL, tbinfo->rolname, - false, "TABLE DATA", SECTION_DATA, - "", "", copyStmt, - &(tbinfo->dobj.dumpId), 1, - dumpFn, tdinfo); + { + TocEntry *te; + + te = ArchiveEntry(fout, tdinfo->dobj.catId, tdinfo->dobj.dumpId, + tbinfo->dobj.name, tbinfo->dobj.namespace->dobj.name, + NULL, tbinfo->rolname, + false, "TABLE DATA", SECTION_DATA, + "", "", copyStmt, + &(tbinfo->dobj.dumpId), 1, + dumpFn, tdinfo); + + /* + * Set the TocEntry's dataLength in case we are doing a parallel dump + * and want to order dump jobs by table size. We choose to measure + * dataLength in table pages during dump, so no scaling is needed. + * However, relpages is declared as "integer" in pg_class, and hence + * also in TableInfo, but it's really BlockNumber a/k/a unsigned int. + * Cast so that we get the right interpretation of table sizes + * exceeding INT_MAX pages. + */ + te->dataLength = (BlockNumber) tbinfo->relpages; + } destroyPQExpBuffer(copyBuf); destroyPQExpBuffer(clistBuf); @@ -6759,8 +6771,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) i_conoid, i_condef, i_tablespace, - i_indreloptions, - i_relpages; + i_indreloptions; int ntups; for (i = 0; i < numTables; i++) @@ -6807,7 +6818,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) "i.indnkeyatts AS indnkeyatts, " "i.indnatts AS indnatts, " "i.indkey, i.indisclustered, " - "i.indisreplident, t.relpages, " + "i.indisreplident, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " @@ -6844,7 +6855,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) "i.indnatts AS indnkeyatts, " "i.indnatts AS indnatts, " "i.indkey, i.indisclustered, " - "i.indisreplident, t.relpages, " + "i.indisreplident, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " @@ -6877,7 +6888,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) "i.indnatts AS indnkeyatts, " "i.indnatts AS indnatts, " "i.indkey, i.indisclustered, " - "false AS indisreplident, t.relpages, " + "false AS indisreplident, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " @@ -6906,7 +6917,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) "i.indnatts AS indnkeyatts, " "i.indnatts AS indnatts, " "i.indkey, i.indisclustered, " - "false AS indisreplident, t.relpages, " + "false AS indisreplident, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " @@ -6938,7 +6949,7 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) "t.relnatts AS indnkeyatts, " "t.relnatts AS indnatts, " "i.indkey, i.indisclustered, " - "false AS indisreplident, t.relpages, " + "false AS indisreplident, " "c.contype, c.conname, " "c.condeferrable, c.condeferred, " "c.tableoid AS contableoid, " @@ -6974,7 +6985,6 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) i_indkey = PQfnumber(res, "indkey"); i_indisclustered = PQfnumber(res, "indisclustered"); i_indisreplident = PQfnumber(res, "indisreplident"); - i_relpages = PQfnumber(res, "relpages"); i_contype = PQfnumber(res, "contype"); i_conname = PQfnumber(res, "conname"); i_condeferrable = PQfnumber(res, "condeferrable"); @@ -7013,7 +7023,6 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables) indxinfo[j].indisclustered = (PQgetvalue(res, j, i_indisclustered)[0] == 't'); indxinfo[j].indisreplident = (PQgetvalue(res, j, i_indisreplident)[0] == 't'); indxinfo[j].parentidx = atooid(PQgetvalue(res, j, i_parentidx)); - indxinfo[j].relpages = atoi(PQgetvalue(res, j, i_relpages)); contype = *(PQgetvalue(res, j, i_contype)); if (contype == 'p' || contype == 'u' || contype == 'x') @@ -8206,6 +8215,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) "'' AS attfdwoptions,\n"); if (fout->remoteVersion >= 90100) + { /* * Since we only want to dump COLLATE clauses for attributes whose * collation is different from their type's default, we use a CASE @@ -8214,6 +8224,7 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) appendPQExpBuffer(q, "CASE WHEN a.attcollation <> t.typcollation " "THEN a.attcollation ELSE 0 END AS attcollation,\n"); + } else appendPQExpBuffer(q, "0 AS attcollation,\n"); @@ -8225,8 +8236,8 @@ getTableAttrs(Archive *fout, TableInfo *tblinfo, int numTables) appendPQExpBuffer(q, "'' AS attoptions\n"); + /* need left join here to not fail on dropped columns ... */ appendPQExpBuffer(q, - /* need left join here to not fail on dropped columns ... */ "FROM pg_catalog.pg_attribute a LEFT JOIN pg_catalog.pg_type t " "ON a.atttypid = t.oid\n" "WHERE a.attrelid = '%u'::pg_catalog.oid " @@ -9772,12 +9783,31 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) break; case DO_BLOB_DATA: if (dobj->dump & DUMP_COMPONENT_DATA) - ArchiveEntry(fout, dobj->catId, dobj->dumpId, - dobj->name, NULL, NULL, "", - false, "BLOBS", SECTION_DATA, - "", "", NULL, - NULL, 0, - dumpBlobs, NULL); + { + TocEntry *te; + + te = ArchiveEntry(fout, dobj->catId, dobj->dumpId, + dobj->name, NULL, NULL, "", + false, "BLOBS", SECTION_DATA, + "", "", NULL, + NULL, 0, + dumpBlobs, NULL); + + /* + * Set the TocEntry's dataLength in case we are doing a + * parallel dump and want to order dump jobs by table size. + * (We need some size estimate for every TocEntry with a + * DataDumper function.) We don't currently have any cheap + * way to estimate the size of blobs, but it doesn't matter; + * let's just set the size to a large value so parallel dumps + * will launch this job first. If there's lots of blobs, we + * win, and if there aren't, we don't lose much. (If you want + * to improve on this, really what you should be thinking + * about is allowing blob dumping to be parallelized, not just + * getting a smarter estimate for the single TOC entry.) + */ + te->dataLength = MaxBlockNumber; + } break; case DO_POLICY: dumpPolicy(fout, (PolicyInfo *) dobj); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1448005f30..685ad78669 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -370,7 +370,6 @@ typedef struct _indxInfo Oid parentidx; /* if partitioned, parent index OID */ /* if there is an associated constraint object, its dumpId: */ DumpId indexconstraint; - int relpages; /* relpages of the underlying table */ } IndxInfo; typedef struct _indexAttachInfo @@ -677,7 +676,6 @@ extern void parseOidArray(const char *str, Oid *array, int arraysize); extern void sortDumpableObjects(DumpableObject **objs, int numObjs, DumpId preBoundaryId, DumpId postBoundaryId); extern void sortDumpableObjectsByTypeName(DumpableObject **objs, int numObjs); -extern void sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs); /* * version specific routines diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 6227a8fd26..a1d3ced318 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -35,10 +35,6 @@ static const char *modulename = gettext_noop("sorter"); * pg_dump.c; that is, PRE_DATA objects must sort before DO_PRE_DATA_BOUNDARY, * POST_DATA objects must sort after DO_POST_DATA_BOUNDARY, and DATA objects * must sort between them. - * - * Note: sortDataAndIndexObjectsBySize wants to have all DO_TABLE_DATA and - * DO_INDEX objects in contiguous chunks, so do not reuse the values for those - * for other object types. */ static const int dbObjectTypePriority[] = { @@ -111,96 +107,6 @@ static void repairDependencyLoop(DumpableObject **loop, static void describeDumpableObject(DumpableObject *obj, char *buf, int bufsize); -static int DOSizeCompare(const void *p1, const void *p2); - -static int -findFirstEqualType(DumpableObjectType type, DumpableObject **objs, int numObjs) -{ - int i; - - for (i = 0; i < numObjs; i++) - if (objs[i]->objType == type) - return i; - return -1; -} - -static int -findFirstDifferentType(DumpableObjectType type, DumpableObject **objs, int numObjs, int start) -{ - int i; - - for (i = start; i < numObjs; i++) - if (objs[i]->objType != type) - return i; - return numObjs - 1; -} - -/* - * When we do a parallel dump, we want to start with the largest items first. - * - * Say we have the objects in this order: - * ....DDDDD....III.... - * - * with D = Table data, I = Index, . = other object - * - * This sorting function now takes each of the D or I blocks and sorts them - * according to their size. - */ -void -sortDataAndIndexObjectsBySize(DumpableObject **objs, int numObjs) -{ - int startIdx, - endIdx; - void *startPtr; - - if (numObjs <= 1) - return; - - startIdx = findFirstEqualType(DO_TABLE_DATA, objs, numObjs); - if (startIdx >= 0) - { - endIdx = findFirstDifferentType(DO_TABLE_DATA, objs, numObjs, startIdx); - startPtr = objs + startIdx; - qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *), - DOSizeCompare); - } - - startIdx = findFirstEqualType(DO_INDEX, objs, numObjs); - if (startIdx >= 0) - { - endIdx = findFirstDifferentType(DO_INDEX, objs, numObjs, startIdx); - startPtr = objs + startIdx; - qsort(startPtr, endIdx - startIdx, sizeof(DumpableObject *), - DOSizeCompare); - } -} - -static int -DOSizeCompare(const void *p1, const void *p2) -{ - DumpableObject *obj1 = *(DumpableObject **) p1; - DumpableObject *obj2 = *(DumpableObject **) p2; - int obj1_size = 0; - int obj2_size = 0; - - if (obj1->objType == DO_TABLE_DATA) - obj1_size = ((TableDataInfo *) obj1)->tdtable->relpages; - if (obj1->objType == DO_INDEX) - obj1_size = ((IndxInfo *) obj1)->relpages; - - if (obj2->objType == DO_TABLE_DATA) - obj2_size = ((TableDataInfo *) obj2)->tdtable->relpages; - if (obj2->objType == DO_INDEX) - obj2_size = ((IndxInfo *) obj2)->relpages; - - /* we want to see the biggest item go first */ - if (obj1_size > obj2_size) - return -1; - if (obj2_size > obj1_size) - return 1; - - return 0; -} /* * Sort the given objects into a type/name-based ordering