From 5f7b48e65f302c2ca4965a4f815ef4bba0d2e842 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 20:38:58 +0200 Subject: [PATCH] index: replace Save() method with Rewrite and SaveFallback Rewrite implements a streaming rewrite of the index that excludes the given packs. For this it loads all index files from the repository and only modifies those that require changes. This will reduce the index churn when running prune. Rewrite does not require the in-memory index and thus can drop it to significantly reduce the memory usage. However, `prune --unsafe-recovery` cannot use this strategy and requires a separate method to save the whole in-memory index. This is now handled using SaveFallback. --- internal/index/master_index.go | 227 +++++++++++++++++++---- internal/index/master_index_test.go | 2 +- internal/repository/prune.go | 12 +- internal/repository/repair_index.go | 15 +- internal/repository/repair_index_test.go | 4 - internal/repository/repair_pack.go | 2 +- 6 files changed, 203 insertions(+), 59 deletions(-) diff --git a/internal/index/master_index.go b/internal/index/master_index.go index a5ee40b52..40d1e3446 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -315,18 +315,190 @@ func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, return mi.MergeFinalIndexes() } -type MasterIndexSaveOpts struct { +type MasterIndexRewriteOpts struct { SaveProgress *progress.Counter DeleteProgress func() *progress.Counter DeleteReport func(id restic.ID, err error) - SkipDeletion bool } -// Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist from finalized indexes. -// It also removes the old index files and those listed in extraObsolete. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts MasterIndexSaveOpts) error { +// Rewrite removes packs whose ID is in excludePacks from all known indexes. +// It also removes the rewritten index files and those listed in extraObsolete. +// If oldIndexes is not nil, then only the indexes in this set are processed. +// This is used by repair index to only rewrite and delete the old indexes. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { + for _, idx := range mi.idx { + if !idx.Final() { + panic("internal error - index must be saved before calling MasterIndex.Rewrite") + } + } + + var indexes restic.IDSet + if oldIndexes != nil { + // repair index adds new index entries for already existing pack files + // only remove the old (possibly broken) entries by only processing old indexes + indexes = oldIndexes + } else { + indexes = mi.IDs() + } + p := opts.SaveProgress + p.SetMax(uint64(len(indexes))) + + // reset state which is not necessary for Rewrite and just consumes a lot of memory + // the index state would be invalid after Rewrite completes anyways + mi.clear() + runtime.GC() + + // copy excludePacks to prevent unintended sideeffects + excludePacks = excludePacks.Clone() + debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks) + wg, wgCtx := errgroup.WithContext(ctx) + + idxCh := make(chan restic.ID) + wg.Go(func() error { + defer close(idxCh) + for id := range indexes { + select { + case idxCh <- id: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) + + var rewriteWg sync.WaitGroup + type rewriteTask struct { + idx *Index + oldFormat bool + } + rewriteCh := make(chan rewriteTask) + loader := func() error { + defer rewriteWg.Done() + for id := range idxCh { + buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id) + if err != nil { + return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err) + } + idx, oldFormat, err := DecodeIndex(buf, id) + if err != nil { + return err + } + + select { + case rewriteCh <- rewriteTask{idx, oldFormat}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + + } + return nil + } + // loading an index can take quite some time such that this can be both CPU- or IO-bound + loaderCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < loaderCount; i++ { + rewriteWg.Add(1) + wg.Go(loader) + } + wg.Go(func() error { + rewriteWg.Wait() + close(rewriteCh) + return nil + }) + + obsolete := restic.NewIDSet(extraObsolete...) + saveCh := make(chan *Index) + + wg.Go(func() error { + defer close(saveCh) + newIndex := NewIndex() + for task := range rewriteCh { + // always rewrite indexes using the old format, that include a pack that must be removed or that are not full + if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) { + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + // index is already up to date + p.Add(1) + continue + } + + ids, err := task.idx.IDs() + if err != nil || len(ids) != 1 { + panic("internal error, index has no ID") + } + obsolete.Merge(restic.NewIDSet(ids...)) + + for pbs := range task.idx.EachByPack(wgCtx, excludePacks) { + newIndex.StorePack(pbs.PackID, pbs.Blobs) + if IndexFull(newIndex, mi.compress) { + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + return wgCtx.Err() + } + newIndex = NewIndex() + } + } + if wgCtx.Err() != nil { + return wgCtx.Err() + } + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + p.Add(1) + } + + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range saveCh { + idx.Finalize() + if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + return err + } + } + return nil + } + + // encoding an index can take quite some time such that this can be both CPU- or IO-bound + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < workerCount; i++ { + wg.Go(worker) + } + err := wg.Wait() + p.Done() + if err != nil { + return fmt.Errorf("failed to rewrite indexes: %w", err) + } + + p = nil + if opts.DeleteProgress != nil { + p = opts.DeleteProgress() + } + defer p.Done() + return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { + if opts.DeleteReport != nil { + opts.DeleteReport(id, err) + } + return err + }, p) +} + +// SaveFallback saves all known indexes to index files, leaving out any +// packs whose ID is contained in packBlacklist from finalized indexes. +// It is only intended for use by prune with the UnsafeRecovery option. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error { p.SetMax(uint64(len(mi.Packs(excludePacks)))) mi.idxMutex.Lock() @@ -334,33 +506,23 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks) - newIndex := NewIndex() - obsolete := restic.NewIDSet(extraObsolete...) - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. + obsolete := restic.NewIDSet() wg, wgCtx := errgroup.WithContext(ctx) ch := make(chan *Index) - wg.Go(func() error { defer close(ch) - for i, idx := range mi.idx { + newIndex := NewIndex() + for _, idx := range mi.idx { if idx.Final() { ids, err := idx.IDs() if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return err + panic("internal error - finalized index without ID") } - debug.Log("adding index ids %v to supersedes field", ids) obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) } - debug.Log("adding index %d", i) - for pbs := range idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) p.Add(1) @@ -396,33 +558,18 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke return nil } - // encoding an index can take quite some time such that this can be both CPU- or IO-bound - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // keep concurrency bounded as we're on a fallback path + workerCount := int(repo.Connections()) // run workers on ch for i := 0; i < workerCount; i++ { wg.Go(worker) } err := wg.Wait() p.Done() - if err != nil { - return err - } + // the index no longer matches to stored state + mi.clear() - if opts.SkipDeletion { - return nil - } - - p = nil - if opts.DeleteProgress != nil { - p = opts.DeleteProgress() - } - defer p.Done() - return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { - if opts.DeleteReport != nil { - opts.DeleteReport(id, err) - } - return err - }, p) + return err } // SaveIndex saves an index in the repository. diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index 41f4cc534..b8a29262e 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -364,7 +364,7 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, index.MasterIndexSaveOpts{})) + rtest.OK(t, idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})) idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 712986e61..895b07994 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -522,7 +522,7 @@ func (plan *PrunePlan) Stats() PruneStats { // - rebuild the index while ignoring all files that will be deleted // - delete the files // plan.removePacks and plan.ignorePacks are modified in this function. -func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) { +func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) error { if plan.opts.DryRun { printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n") if len(plan.removePacksFirst) > 0 { @@ -581,12 +581,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") indexFiles := repo.idx.IDs() - err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) + err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } } else if len(plan.ignorePacks) != 0 { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer) + err := rewriteIndexFiles(ctx, repo, plan.ignorePacks, nil, nil, printer) if err != nil { return errors.Fatalf("%s", err) } @@ -601,16 +601,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e } if plan.opts.UnsafeRecovery { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) + err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed")) if err != nil { return errors.Fatalf("%s", err) } } - if err != nil { - return err - } - // drop outdated in-memory index repo.clearIndex() diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index 4ac6cdd3a..e01131923 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -62,6 +62,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } + oldIndexes := repo.idx.IDs() + printer.P("getting pack files to read...\n") err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { size, ok := packSizeFromIndex[id] @@ -103,7 +105,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } - err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer) + if err := repo.Flush(ctx); err != nil { + return err + } + + err = rewriteIndexFiles(ctx, repo, removePacks, oldIndexes, obsoleteIndexes, printer) if err != nil { return err } @@ -113,11 +119,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, return nil } -func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { +func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, printer progress.Printer) error { printer.P("rebuilding index\n") - bar := printer.NewCounter("packs processed") - return repo.idx.Save(ctx, repo, removePacks, extraObsolete, index.MasterIndexSaveOpts{ + bar := printer.NewCounter("indexes processed") + return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") @@ -129,6 +135,5 @@ func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic printer.VV("removed index %v\n", id.String()) } }, - SkipDeletion: skipDeletion, }) } diff --git a/internal/repository/repair_index_test.go b/internal/repository/repair_index_test.go index 79922e9ec..ac47d59ff 100644 --- a/internal/repository/repair_index_test.go +++ b/internal/repository/repair_index_test.go @@ -30,10 +30,6 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, ReadAllPacks: readAllPacks, }, &progress.NoopPrinter{})) - newIndexes := listIndex(t, repo) - old := indexes.Intersect(newIndexes) - rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old) - checker.TestCheckRepo(t, repo, true) } diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 7cb9d9f3e..811388cc9 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -56,7 +56,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe } // remove salvaged packs from index - err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer) + err = rewriteIndexFiles(ctx, repo, ids, nil, nil, printer) if err != nil { return err }