diff --git a/internal/index/master_index.go b/internal/index/master_index.go index a5ee40b52..40d1e3446 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -315,18 +315,190 @@ func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, return mi.MergeFinalIndexes() } -type MasterIndexSaveOpts struct { +type MasterIndexRewriteOpts struct { SaveProgress *progress.Counter DeleteProgress func() *progress.Counter DeleteReport func(id restic.ID, err error) - SkipDeletion bool } -// Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist from finalized indexes. -// It also removes the old index files and those listed in extraObsolete. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts MasterIndexSaveOpts) error { +// Rewrite removes packs whose ID is in excludePacks from all known indexes. +// It also removes the rewritten index files and those listed in extraObsolete. +// If oldIndexes is not nil, then only the indexes in this set are processed. +// This is used by repair index to only rewrite and delete the old indexes. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { + for _, idx := range mi.idx { + if !idx.Final() { + panic("internal error - index must be saved before calling MasterIndex.Rewrite") + } + } + + var indexes restic.IDSet + if oldIndexes != nil { + // repair index adds new index entries for already existing pack files + // only remove the old (possibly broken) entries by only processing old indexes + indexes = oldIndexes + } else { + indexes = mi.IDs() + } + p := opts.SaveProgress + p.SetMax(uint64(len(indexes))) + + // reset state which is not necessary for Rewrite and just consumes a lot of memory + // the index state would be invalid after Rewrite completes anyways + mi.clear() + runtime.GC() + + // copy excludePacks to prevent unintended sideeffects + excludePacks = excludePacks.Clone() + debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks) + wg, wgCtx := errgroup.WithContext(ctx) + + idxCh := make(chan restic.ID) + wg.Go(func() error { + defer close(idxCh) + for id := range indexes { + select { + case idxCh <- id: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) + + var rewriteWg sync.WaitGroup + type rewriteTask struct { + idx *Index + oldFormat bool + } + rewriteCh := make(chan rewriteTask) + loader := func() error { + defer rewriteWg.Done() + for id := range idxCh { + buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id) + if err != nil { + return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err) + } + idx, oldFormat, err := DecodeIndex(buf, id) + if err != nil { + return err + } + + select { + case rewriteCh <- rewriteTask{idx, oldFormat}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + + } + return nil + } + // loading an index can take quite some time such that this can be both CPU- or IO-bound + loaderCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < loaderCount; i++ { + rewriteWg.Add(1) + wg.Go(loader) + } + wg.Go(func() error { + rewriteWg.Wait() + close(rewriteCh) + return nil + }) + + obsolete := restic.NewIDSet(extraObsolete...) + saveCh := make(chan *Index) + + wg.Go(func() error { + defer close(saveCh) + newIndex := NewIndex() + for task := range rewriteCh { + // always rewrite indexes using the old format, that include a pack that must be removed or that are not full + if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) { + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + // index is already up to date + p.Add(1) + continue + } + + ids, err := task.idx.IDs() + if err != nil || len(ids) != 1 { + panic("internal error, index has no ID") + } + obsolete.Merge(restic.NewIDSet(ids...)) + + for pbs := range task.idx.EachByPack(wgCtx, excludePacks) { + newIndex.StorePack(pbs.PackID, pbs.Blobs) + if IndexFull(newIndex, mi.compress) { + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + return wgCtx.Err() + } + newIndex = NewIndex() + } + } + if wgCtx.Err() != nil { + return wgCtx.Err() + } + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + p.Add(1) + } + + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range saveCh { + idx.Finalize() + if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + return err + } + } + return nil + } + + // encoding an index can take quite some time such that this can be both CPU- or IO-bound + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < workerCount; i++ { + wg.Go(worker) + } + err := wg.Wait() + p.Done() + if err != nil { + return fmt.Errorf("failed to rewrite indexes: %w", err) + } + + p = nil + if opts.DeleteProgress != nil { + p = opts.DeleteProgress() + } + defer p.Done() + return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { + if opts.DeleteReport != nil { + opts.DeleteReport(id, err) + } + return err + }, p) +} + +// SaveFallback saves all known indexes to index files, leaving out any +// packs whose ID is contained in packBlacklist from finalized indexes. +// It is only intended for use by prune with the UnsafeRecovery option. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error { p.SetMax(uint64(len(mi.Packs(excludePacks)))) mi.idxMutex.Lock() @@ -334,33 +506,23 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks) - newIndex := NewIndex() - obsolete := restic.NewIDSet(extraObsolete...) - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. + obsolete := restic.NewIDSet() wg, wgCtx := errgroup.WithContext(ctx) ch := make(chan *Index) - wg.Go(func() error { defer close(ch) - for i, idx := range mi.idx { + newIndex := NewIndex() + for _, idx := range mi.idx { if idx.Final() { ids, err := idx.IDs() if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return err + panic("internal error - finalized index without ID") } - debug.Log("adding index ids %v to supersedes field", ids) obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) } - debug.Log("adding index %d", i) - for pbs := range idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) p.Add(1) @@ -396,33 +558,18 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke return nil } - // encoding an index can take quite some time such that this can be both CPU- or IO-bound - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // keep concurrency bounded as we're on a fallback path + workerCount := int(repo.Connections()) // run workers on ch for i := 0; i < workerCount; i++ { wg.Go(worker) } err := wg.Wait() p.Done() - if err != nil { - return err - } + // the index no longer matches to stored state + mi.clear() - if opts.SkipDeletion { - return nil - } - - p = nil - if opts.DeleteProgress != nil { - p = opts.DeleteProgress() - } - defer p.Done() - return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { - if opts.DeleteReport != nil { - opts.DeleteReport(id, err) - } - return err - }, p) + return err } // SaveIndex saves an index in the repository. diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index 41f4cc534..b8a29262e 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -364,7 +364,7 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, index.MasterIndexSaveOpts{})) + rtest.OK(t, idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})) idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 712986e61..895b07994 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -522,7 +522,7 @@ func (plan *PrunePlan) Stats() PruneStats { // - rebuild the index while ignoring all files that will be deleted // - delete the files // plan.removePacks and plan.ignorePacks are modified in this function. -func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) { +func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) error { if plan.opts.DryRun { printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n") if len(plan.removePacksFirst) > 0 { @@ -581,12 +581,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") indexFiles := repo.idx.IDs() - err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) + err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } } else if len(plan.ignorePacks) != 0 { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer) + err := rewriteIndexFiles(ctx, repo, plan.ignorePacks, nil, nil, printer) if err != nil { return errors.Fatalf("%s", err) } @@ -601,16 +601,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e } if plan.opts.UnsafeRecovery { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) + err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed")) if err != nil { return errors.Fatalf("%s", err) } } - if err != nil { - return err - } - // drop outdated in-memory index repo.clearIndex() diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index 4ac6cdd3a..e01131923 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -62,6 +62,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } + oldIndexes := repo.idx.IDs() + printer.P("getting pack files to read...\n") err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { size, ok := packSizeFromIndex[id] @@ -103,7 +105,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } - err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer) + if err := repo.Flush(ctx); err != nil { + return err + } + + err = rewriteIndexFiles(ctx, repo, removePacks, oldIndexes, obsoleteIndexes, printer) if err != nil { return err } @@ -113,11 +119,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, return nil } -func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { +func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, printer progress.Printer) error { printer.P("rebuilding index\n") - bar := printer.NewCounter("packs processed") - return repo.idx.Save(ctx, repo, removePacks, extraObsolete, index.MasterIndexSaveOpts{ + bar := printer.NewCounter("indexes processed") + return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") @@ -129,6 +135,5 @@ func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic printer.VV("removed index %v\n", id.String()) } }, - SkipDeletion: skipDeletion, }) } diff --git a/internal/repository/repair_index_test.go b/internal/repository/repair_index_test.go index 79922e9ec..ac47d59ff 100644 --- a/internal/repository/repair_index_test.go +++ b/internal/repository/repair_index_test.go @@ -30,10 +30,6 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, ReadAllPacks: readAllPacks, }, &progress.NoopPrinter{})) - newIndexes := listIndex(t, repo) - old := indexes.Intersect(newIndexes) - rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old) - checker.TestCheckRepo(t, repo, true) } diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 7cb9d9f3e..811388cc9 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -56,7 +56,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe } // remove salvaged packs from index - err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer) + err = rewriteIndexFiles(ctx, repo, ids, nil, nil, printer) if err != nil { return err }