diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index f2b544871e..3b8b950f6d 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -59,7 +59,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode, SetupWorkerPtr setupWorkerPtr); static void _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH); -static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass); +static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData); static char *replace_line_endings(const char *str); static void _doSetFixedOutputState(ArchiveHandle *AH); static void _doSetSessionAuth(ArchiveHandle *AH, const char *user); @@ -72,6 +72,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace); static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te); static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te); static teReqs _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt); +static RestorePass _tocEntryRestorePass(TocEntry *te); static bool _tocEntryIsACL(TocEntry *te); static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te); static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te); @@ -87,13 +88,18 @@ static OutputContext SaveOutput(ArchiveHandle *AH); static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext); static int restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel); -static void restore_toc_entries_prefork(ArchiveHandle *AH); -static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, +static void restore_toc_entries_prefork(ArchiveHandle *AH, + TocEntry *pending_list); +static void restore_toc_entries_parallel(ArchiveHandle *AH, + ParallelState *pstate, + TocEntry *pending_list); +static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); -static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list); static void par_list_header_init(TocEntry *l); static void par_list_append(TocEntry *l, TocEntry *te); static void par_list_remove(TocEntry *te); +static void move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, + RestorePass pass); static TocEntry *get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list, ParallelState *pstate); @@ -616,20 +622,18 @@ RestoreArchive(Archive *AHX) AH->currSchema = NULL; } - /* - * In serial mode, we now process each non-ACL TOC entry. - * - * In parallel mode, turn control over to the parallel-restore logic. - */ if (parallel_mode) { + /* + * In parallel mode, turn control over to the parallel-restore logic. + */ ParallelState *pstate; TocEntry pending_list; par_list_header_init(&pending_list); /* This runs PRE_DATA items and then disconnects from the database */ - restore_toc_entries_prefork(AH); + restore_toc_entries_prefork(AH, &pending_list); Assert(AH->connection == NULL); /* ParallelBackupStart() will actually fork the processes */ @@ -643,28 +647,51 @@ RestoreArchive(Archive *AHX) } else { + /* + * In serial mode, process everything in three phases: normal items, + * then ACLs, then matview refresh items. We might be able to skip + * one or both extra phases in some cases, eg data-only restores. + */ + bool haveACL = false; + bool haveRefresh = false; + for (te = AH->toc->next; te != AH->toc; te = te->next) - (void) restore_toc_entry(AH, te, false); - } - - /* - * Scan TOC again to output ownership commands and ACLs - */ - for (te = AH->toc->next; te != AH->toc; te = te->next) - { - AH->currentTE = te; - - /* Both schema and data objects might now have ownership/ACLs */ - if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) { - /* Show namespace if available */ - if (te->namespace) - ahlog(AH, 1, "setting owner and privileges for %s \"%s.%s\"\n", - te->desc, te->namespace, te->tag); - else - ahlog(AH, 1, "setting owner and privileges for %s \"%s\"\n", - te->desc, te->tag); - _printTocEntry(AH, te, false, true); + if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) == 0) + continue; /* ignore if not to be dumped at all */ + + switch (_tocEntryRestorePass(te)) + { + case RESTORE_PASS_MAIN: + (void) restore_toc_entry(AH, te, false); + break; + case RESTORE_PASS_ACL: + haveACL = true; + break; + case RESTORE_PASS_REFRESH: + haveRefresh = true; + break; + } + } + + if (haveACL) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && + _tocEntryRestorePass(te) == RESTORE_PASS_ACL) + (void) restore_toc_entry(AH, te, false); + } + } + + if (haveRefresh) + { + for (te = AH->toc->next; te != AH->toc; te = te->next) + { + if ((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && + _tocEntryRestorePass(te) == RESTORE_PASS_REFRESH) + (void) restore_toc_entry(AH, te, false); + } } } @@ -711,10 +738,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) AH->currentTE = te; /* Work out what, if anything, we want from this entry */ - if (_tocEntryIsACL(te)) - reqs = 0; /* ACLs are never restored here */ - else - reqs = te->reqs; + reqs = te->reqs; /* * Ignore DATABASE entry unless we should create it. We must check this @@ -735,17 +759,19 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) defnDumped = false; - if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */ + /* + * If it has a schema component that we want, then process that + */ + if ((reqs & REQ_SCHEMA) != 0) { - /* Show namespace if available */ + /* Show namespace in log message if available */ if (te->namespace) ahlog(AH, 1, "creating %s \"%s.%s\"\n", te->desc, te->namespace, te->tag); else ahlog(AH, 1, "creating %s \"%s\"\n", te->desc, te->tag); - - _printTocEntry(AH, te, false, false); + _printTocEntry(AH, te, false); defnDumped = true; if (strcmp(te->desc, "TABLE") == 0) @@ -801,7 +827,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) } /* - * If we have a data component, then process it + * If it has a data component that we want, then process that */ if ((reqs & REQ_DATA) != 0) { @@ -817,7 +843,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) */ if (AH->PrintTocDataPtr !=NULL) { - _printTocEntry(AH, te, true, false); + _printTocEntry(AH, te, true); if (strcmp(te->desc, "BLOBS") == 0 || strcmp(te->desc, "BLOB COMMENTS") == 0) @@ -905,7 +931,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel) { /* If we haven't already dumped the defn part, do so now */ ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag); - _printTocEntry(AH, te, false, false); + _printTocEntry(AH, te, false); } } @@ -2897,8 +2923,30 @@ _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt) return res; } +/* + * Identify which pass we should restore this TOC entry in. + * + * See notes with the RestorePass typedef in pg_backup_archiver.h. + */ +static RestorePass +_tocEntryRestorePass(TocEntry *te) +{ + /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */ + if (strcmp(te->desc, "ACL") == 0 || + strcmp(te->desc, "ACL LANGUAGE") == 0 || + strcmp(te->desc, "DEFAULT ACL") == 0) + return RESTORE_PASS_ACL; + if (strcmp(te->desc, "MATERIALIZED VIEW DATA") == 0) + return RESTORE_PASS_REFRESH; + return RESTORE_PASS_MAIN; +} + /* * Identify TOC entries that are ACLs. + * + * Note: it seems worth duplicating some code here to avoid a hard-wired + * assumption that these are exactly the same entries that we restore during + * the RESTORE_PASS_ACL phase. */ static bool _tocEntryIsACL(TocEntry *te) @@ -3316,23 +3364,18 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH) type); } +/* + * Emit the SQL commands to create the object represented by a TOC entry + * + * This now also includes issuing an ALTER OWNER command to restore the + * object's ownership, if wanted. But note that the object's permissions + * will remain at default, until the matching ACL TOC entry is restored. + */ static void -_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass) +_printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData) { RestoreOptions *ropt = AH->public.ropt; - /* ACLs are dumped only during acl pass */ - if (acl_pass) - { - if (!_tocEntryIsACL(te)) - return; - } - else - { - if (_tocEntryIsACL(te)) - return; - } - /* * Avoid dumping the public schema, as it will already be created ... * unless we are using --clean mode (and *not* --create mode), in which @@ -3516,7 +3559,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData, bool acl_pass) * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION * commands, so we can no longer assume we know the current auth setting. */ - if (acl_pass) + if (_tocEntryIsACL(te)) { if (AH->currUser) free(AH->currUser); @@ -3546,6 +3589,9 @@ replace_line_endings(const char *str) return result; } +/* + * Write the file header for a custom-format archive + */ void WriteHead(ArchiveHandle *AH) { @@ -3717,16 +3763,14 @@ dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim) /* * Main engine for parallel restore. * - * Work is done in three phases. - * First we process all SECTION_PRE_DATA tocEntries, in a single connection, - * just as for a standard restore. Second we process the remaining non-ACL - * steps in parallel worker children (threads on Windows, processes on Unix), - * each of which connects separately to the database. Finally we process all - * the ACL entries in a single connection (that happens back in - * RestoreArchive). + * Parallel restore is done in three phases. In this first phase, + * we'll process all SECTION_PRE_DATA TOC entries that are allowed to be + * processed in the RESTORE_PASS_MAIN pass. (In practice, that's all + * PRE_DATA items other than ACLs.) Entries we can't process now are + * added to the pending_list for later phases to deal with. */ static void -restore_toc_entries_prefork(ArchiveHandle *AH) +restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) { bool skipped_some; TocEntry *next_work_item; @@ -3744,23 +3788,31 @@ restore_toc_entries_prefork(ArchiveHandle *AH) * about showing all the dependencies of SECTION_PRE_DATA items, so we do * not risk trying to process them out-of-order. * + * Stuff that we can't do immediately gets added to the pending_list. + * Note: we don't yet filter out entries that aren't going to be restored. + * They might participate in dependency chains connecting entries that + * should be restored, so we treat them as live until we actually process + * them. + * * Note: as of 9.2, it should be guaranteed that all PRE_DATA items appear * before DATA items, and all DATA items before POST_DATA items. That is * not certain to be true in older archives, though, so this loop is coded * to not assume it. */ + AH->restorePass = RESTORE_PASS_MAIN; skipped_some = false; for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) { - /* NB: process-or-continue logic must be the inverse of loop below */ + bool do_now = true; + if (next_work_item->section != SECTION_PRE_DATA) { /* DATA and POST_DATA items are just ignored for now */ if (next_work_item->section == SECTION_DATA || next_work_item->section == SECTION_POST_DATA) { + do_now = false; skipped_some = true; - continue; } else { @@ -3771,18 +3823,35 @@ restore_toc_entries_prefork(ArchiveHandle *AH) * comment's dependencies are satisfied, so skip it for now. */ if (skipped_some) - continue; + do_now = false; } } - ahlog(AH, 1, "processing item %d %s %s\n", - next_work_item->dumpId, - next_work_item->desc, next_work_item->tag); + /* + * Also skip items that need to be forced into later passes. We need + * not set skipped_some in this case, since by assumption no main-pass + * items could depend on these. + */ + if (_tocEntryRestorePass(next_work_item) != RESTORE_PASS_MAIN) + do_now = false; - (void) restore_toc_entry(AH, next_work_item, false); + if (do_now) + { + /* OK, restore the item and update its dependencies */ + ahlog(AH, 1, "processing item %d %s %s\n", + next_work_item->dumpId, + next_work_item->desc, next_work_item->tag); - /* there should be no touch of ready_list here, so pass NULL */ - reduce_dependencies(AH, next_work_item, NULL); + (void) restore_toc_entry(AH, next_work_item, false); + + /* there should be no touch of ready_list here, so pass NULL */ + reduce_dependencies(AH, next_work_item, NULL); + } + else + { + /* Nope, so add it to pending_list */ + par_list_append(pending_list, next_work_item); + } } /* @@ -3808,91 +3877,60 @@ restore_toc_entries_prefork(ArchiveHandle *AH) /* * Main engine for parallel restore. * - * Work is done in three phases. - * First we process all SECTION_PRE_DATA tocEntries, in a single connection, - * just as for a standard restore. This is done in restore_toc_entries_prefork(). - * Second we process the remaining non-ACL steps in parallel worker children - * (threads on Windows, processes on Unix), these fork off and set up their - * connections before we call restore_toc_entries_parallel_forked. - * Finally we process all the ACL entries in a single connection (that happens - * back in RestoreArchive). + * Parallel restore is done in three phases. In this second phase, + * we process entries by dispatching them to parallel worker children + * (processes on Unix, threads on Windows), each of which connects + * separately to the database. Inter-entry dependencies are respected, + * and so is the RestorePass multi-pass structure. When we can no longer + * make any entries ready to process, we exit. Normally, there will be + * nothing left to do; but if there is, the third phase will mop up. */ static void restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, TocEntry *pending_list) { - int work_status; - bool skipped_some; TocEntry ready_list; TocEntry *next_work_item; - int ret_child; ahlog(AH, 2, "entering restore_toc_entries_parallel\n"); /* - * Initialize the lists of ready items, the list for pending items has - * already been initialized in the caller. After this setup, the pending - * list is everything that needs to be done but is blocked by one or more - * dependencies, while the ready list contains items that have no - * remaining dependencies. Note: we don't yet filter out entries that - * aren't going to be restored. They might participate in dependency - * chains connecting entries that should be restored, so we treat them as - * live until we actually process them. + * The pending_list contains all items that we need to restore. Move all + * items that are available to process immediately into the ready_list. + * After this setup, the pending list is everything that needs to be done + * but is blocked by one or more dependencies, while the ready list + * contains items that have no remaining dependencies and are OK to + * process in the current restore pass. */ par_list_header_init(&ready_list); - skipped_some = false; - for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next) - { - /* NB: process-or-continue logic must be the inverse of loop above */ - if (next_work_item->section == SECTION_PRE_DATA) - { - /* All PRE_DATA items were dealt with above */ - continue; - } - if (next_work_item->section == SECTION_DATA || - next_work_item->section == SECTION_POST_DATA) - { - /* set this flag at same point that previous loop did */ - skipped_some = true; - } - else - { - /* SECTION_NONE items must be processed if previous loop didn't */ - if (!skipped_some) - continue; - } - - if (next_work_item->depCount > 0) - par_list_append(pending_list, next_work_item); - else - par_list_append(&ready_list, next_work_item); - } + AH->restorePass = RESTORE_PASS_MAIN; + move_to_ready_list(pending_list, &ready_list, AH->restorePass); /* * main parent loop * * Keep going until there is no worker still running AND there is no work - * left to be done. + * left to be done. Note invariant: at top of loop, there should always + * be at least one worker available to dispatch a job to. */ - ahlog(AH, 1, "entering main parallel loop\n"); - while ((next_work_item = get_next_work_item(AH, &ready_list, pstate)) != NULL || - !IsEveryWorkerIdle(pstate)) + for (;;) { + /* Look for an item ready to be dispatched to a worker */ + next_work_item = get_next_work_item(AH, &ready_list, pstate); if (next_work_item != NULL) { /* If not to be restored, don't waste time launching a worker */ - if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0 || - _tocEntryIsACL(next_work_item)) + if ((next_work_item->reqs & (REQ_SCHEMA | REQ_DATA)) == 0) { ahlog(AH, 1, "skipping item %d %s %s\n", next_work_item->dumpId, next_work_item->desc, next_work_item->tag); - + /* Drop it from ready_list, and update its dependencies */ par_list_remove(next_work_item); reduce_dependencies(AH, next_work_item, &ready_list); - + /* Loop around to see if anything else can be dispatched */ continue; } @@ -3900,18 +3938,40 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, next_work_item->dumpId, next_work_item->desc, next_work_item->tag); + /* Remove it from ready_list, and dispatch to some worker */ par_list_remove(next_work_item); DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE); } + else if (IsEveryWorkerIdle(pstate)) + { + /* + * Nothing is ready and no worker is running, so we're done with + * the current pass or maybe with the whole process. + */ + if (AH->restorePass == RESTORE_PASS_LAST) + break; /* No more parallel processing is possible */ + + /* Advance to next restore pass */ + AH->restorePass++; + /* That probably allows some stuff to be made ready */ + move_to_ready_list(pending_list, &ready_list, AH->restorePass); + /* Loop around to see if anything's now ready */ + continue; + } else { - /* at least one child is working and we have nothing ready. */ + /* + * We have nothing ready, but at least one child is working, so + * wait for some subjob to finish. + */ } for (;;) { int nTerm = 0; + int ret_child; + int work_status; /* * In order to reduce dependencies as soon as possible and @@ -3952,9 +4012,21 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate, } } + /* There should now be nothing in ready_list. */ + Assert(ready_list.par_next == &ready_list); + ahlog(AH, 1, "finished main parallel loop\n"); } +/* + * Main engine for parallel restore. + * + * Parallel restore is done in three phases. In this third phase, + * we mop up any remaining TOC entries by processing them serially. + * This phase normally should have nothing to do, but if we've somehow + * gotten stuck due to circular dependencies or some such, this provides + * at least some chance of completing the restore successfully. + */ static void restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) { @@ -3974,9 +4046,10 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) _doSetFixedOutputState(AH); /* - * Make sure there is no non-ACL work left due to, say, circular - * dependencies, or some other pathological condition. If so, do it in the - * single parent connection. + * Make sure there is no work left due to, say, circular dependencies, or + * some other pathological condition. If so, do it in the single parent + * connection. We don't sweat about RestorePass ordering; it's likely we + * already violated that. */ for (te = pending_list->par_next; te != pending_list; te = te->par_next) { @@ -3984,8 +4057,6 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) te->dumpId, te->desc, te->tag); (void) restore_toc_entry(AH, te, false); } - - /* The ACLs will be handled back in RestoreArchive. */ } /* @@ -4044,6 +4115,36 @@ par_list_remove(TocEntry *te) } +/* + * Move all immediately-ready items from pending_list to ready_list. + * + * Items are considered ready if they have no remaining dependencies and + * they belong in the current restore pass. (See also reduce_dependencies, + * which applies the same logic one-at-a-time.) + */ +static void +move_to_ready_list(TocEntry *pending_list, TocEntry *ready_list, + RestorePass pass) +{ + TocEntry *te; + TocEntry *next_te; + + for (te = pending_list->par_next; te != pending_list; te = next_te) + { + /* must save list link before possibly moving te to other list */ + next_te = te->par_next; + + if (te->depCount == 0 && + _tocEntryRestorePass(te) == pass) + { + /* Remove it from pending_list ... */ + par_list_remove(te); + /* ... and add to ready_list */ + par_list_append(ready_list, te); + } + } +} + /* * Find the next work item (if any) that is capable of being run now. * @@ -4433,8 +4534,17 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list) { TocEntry *otherte = AH->tocsByDumpId[te->revDeps[i]]; + Assert(otherte->depCount > 0); otherte->depCount--; - if (otherte->depCount == 0 && otherte->par_prev != NULL) + + /* + * It's ready if it has no remaining dependencies and it belongs in + * the current restore pass. However, don't move it if it has not yet + * been put into the pending list. + */ + if (otherte->depCount == 0 && + _tocEntryRestorePass(otherte) == AH->restorePass && + otherte->par_prev != NULL) { /* It must be in the pending list, so remove it ... */ par_list_remove(otherte); diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 0376f2bff7..5eb1fc4949 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -200,6 +200,30 @@ typedef enum OUTPUT_OTHERDATA /* writing data as INSERT commands */ } ArchiverOutput; +/* + * For historical reasons, ACL items are interspersed with everything else in + * a dump file's TOC; typically they're right after the object they're for. + * However, we need to restore data before ACLs, as otherwise a read-only + * table (ie one where the owner has revoked her own INSERT privilege) causes + * data restore failures. On the other hand, matview REFRESH commands should + * come out after ACLs, as otherwise non-superuser-owned matviews might not + * be able to execute. (If the permissions at the time of dumping would not + * allow a REFRESH, too bad; we won't fix that for you.) These considerations + * force us to make three passes over the TOC, restoring the appropriate + * subset of items in each pass. We assume that the dependency sort resulted + * in an appropriate ordering of items within each subset. + * XXX This mechanism should be superseded by tracking dependencies on ACLs + * properly; but we'll still need it for old dump files even after that. + */ +typedef enum +{ + RESTORE_PASS_MAIN = 0, /* Main pass (most TOC item types) */ + RESTORE_PASS_ACL, /* ACL item types */ + RESTORE_PASS_REFRESH /* Matview REFRESH items */ + +#define RESTORE_PASS_LAST RESTORE_PASS_REFRESH +} RestorePass; + typedef enum { REQ_SCHEMA = 0x01, /* want schema */ @@ -331,6 +355,7 @@ struct _archiveHandle int noTocComments; ArchiverStage stage; ArchiverStage lastErrorStage; + RestorePass restorePass; /* used only during parallel restore */ struct _tocEntry *currentTE; struct _tocEntry *lastErrorTE; };