From 96904f897281c074a441f381efcaeb636c122877 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 18 Dec 2020 19:37:08 +0100 Subject: [PATCH 1/5] check: extract parallel index loading --- internal/checker/checker.go | 118 ++++++-------------------- internal/repository/index_parallel.go | 77 +++++++++++++++++ 2 files changed, 104 insertions(+), 91 deletions(-) create mode 100644 internal/repository/index_parallel.go diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 309f932f5..d6c160abd 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -74,102 +74,38 @@ func (err ErrOldIndexFormat) Error() string { func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("Start") - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, wgCtx := errgroup.WithContext(ctx) - - type FileInfo struct { - restic.ID - Size int64 - } - - type Result struct { - *repository.Index - restic.ID - Err error - } - - ch := make(chan FileInfo) - resultCh := make(chan Result) - - // send list of index files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error { - select { - case <-wgCtx.Done(): - return nil - case ch <- FileInfo{id, size}: - } - return nil - }) - }) - - // a worker receives an index ID from ch, loads the index, and sends it to indexCh - worker := func() error { - var buf []byte - for fi := range ch { - debug.Log("worker got file %v", fi.ID.Str()) - var err error - var idx *repository.Index - oldFormat := false - - buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID) - if err == nil { - idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID) - } - - if oldFormat { - debug.Log("index %v has old format", fi.ID.Str()) - hints = append(hints, ErrOldIndexFormat{fi.ID}) - } - - err = errors.Wrapf(err, "error loading index %v", fi.ID.Str()) - - select { - case resultCh <- Result{idx, fi.ID, err}: - case <-wgCtx.Done(): - } - } - return nil - } - - // run workers on ch - wg.Go(func() error { - defer close(resultCh) - return repository.RunWorkers(defaultParallelism, worker) - }) - - // receive decoded indexes packToIndex := make(map[restic.ID]restic.IDSet) - wg.Go(func() error { - for res := range resultCh { - debug.Log("process index %v, err %v", res.ID, res.Err) + err := repository.ForAllIndexes(ctx, c.repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error { + debug.Log("process index %v, err %v", id, err) - if res.Err != nil { - errs = append(errs, res.Err) - continue - } - - c.masterIndex.Insert(res.Index) - - debug.Log("process blobs") - cnt := 0 - for blob := range res.Index.Each(wgCtx) { - cnt++ - - if _, ok := packToIndex[blob.PackID]; !ok { - packToIndex[blob.PackID] = restic.NewIDSet() - } - packToIndex[blob.PackID].Insert(res.ID) - } - - debug.Log("%d blobs processed", cnt) + if oldFormat { + debug.Log("index %v has old format", id.Str()) + hints = append(hints, ErrOldIndexFormat{id}) } + + err = errors.Wrapf(err, "error loading index %v", id.Str()) + + if err != nil { + errs = append(errs, err) + return nil + } + + c.masterIndex.Insert(index) + + debug.Log("process blobs") + cnt := 0 + for blob := range index.Each(ctx) { + cnt++ + + if _, ok := packToIndex[blob.PackID]; !ok { + packToIndex[blob.PackID] = restic.NewIDSet() + } + packToIndex[blob.PackID].Insert(id) + } + + debug.Log("%d blobs processed", cnt) return nil }) - - err := wg.Wait() if err != nil { errs = append(errs, err) } diff --git a/internal/repository/index_parallel.go b/internal/repository/index_parallel.go new file mode 100644 index 000000000..ffe03d427 --- /dev/null +++ b/internal/repository/index_parallel.go @@ -0,0 +1,77 @@ +package repository + +import ( + "context" + "sync" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +const loadIndexParallelism = 5 + +// ForAllIndexes loads all index files in parallel and calls the given callback. +// It is guaranteed that the function is not run concurrently. If the callback +// returns an error, this function is cancelled and also returns that error. +func ForAllIndexes(ctx context.Context, repo restic.Repository, + fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error { + + debug.Log("Start") + + type FileInfo struct { + restic.ID + Size int64 + } + + var m sync.Mutex + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + ch := make(chan FileInfo) + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } + return nil + }) + }) + + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + var buf []byte + for fi := range ch { + debug.Log("worker got file %v", fi.ID.Str()) + var err error + var idx *Index + oldFormat := false + + buf, err = repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID) + if err == nil { + idx, oldFormat, err = DecodeIndex(buf, fi.ID) + } + + m.Lock() + err = fn(fi.ID, idx, oldFormat, err) + m.Unlock() + if err != nil { + return err + } + } + return nil + } + + // run workers on ch + wg.Go(func() error { + return RunWorkers(loadIndexParallelism, worker) + }) + + return wg.Wait() +} From ccc84af73d98728fa6f767a3ab018781faab54cd Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 18:19:25 +0100 Subject: [PATCH 2/5] debug/list: parallelize index loading --- cmd/restic/cmd_debug.go | 4 +--- cmd/restic/cmd_list.go | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 0eb1c8e87..3403b259a 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -110,10 +110,8 @@ func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer) } func dumpIndexes(ctx context.Context, repo restic.Repository, wr io.Writer) error { - return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + return repository.ForAllIndexes(ctx, repo, func(id restic.ID, idx *repository.Index, oldFormat bool, err error) error { Printf("index_id: %v\n", id) - - idx, err := repository.LoadIndex(ctx, repo, id) if err != nil { return err } diff --git a/cmd/restic/cmd_list.go b/cmd/restic/cmd_list.go index 50b55e857..af711da3d 100644 --- a/cmd/restic/cmd_list.go +++ b/cmd/restic/cmd_list.go @@ -60,8 +60,7 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error { case "locks": t = restic.LockFile case "blobs": - return repo.List(opts.ctx, restic.IndexFile, func(id restic.ID, size int64) error { - idx, err := repository.LoadIndex(opts.ctx, repo, id) + return repository.ForAllIndexes(opts.ctx, repo, func(id restic.ID, idx *repository.Index, oldFormat bool, err error) error { if err != nil { return err } @@ -70,7 +69,6 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error { } return nil }) - default: return errors.Fatal("invalid type") } From 24474a36f4a69e74a1e9c6fb0ac1d99a13917322 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 18:50:19 +0100 Subject: [PATCH 3/5] repository: deduplicate index loading implementation --- internal/repository/repository.go | 83 ++++++------------------------- 1 file changed, 15 insertions(+), 68 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index fbc72726f..c534fb565 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -434,88 +434,35 @@ func (r *Repository) SaveFullIndex(ctx context.Context) error { return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...) } -const loadIndexParallelism = 4 - // LoadIndex loads all index files from the backend in parallel and stores them // in the master index. The first error that occurred is returned. func (r *Repository) LoadIndex(ctx context.Context) error { debug.Log("Loading index") - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) - - type FileInfo struct { - restic.ID - Size int64 - } - ch := make(chan FileInfo) - indexCh := make(chan *Index) - - // send list of index files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { - select { - case <-ctx.Done(): - return nil - case ch <- FileInfo{id, size}: - } - return nil - }) - }) - - // a worker receives an index ID from ch, loads the index, and sends it to indexCh - worker := func() error { - var buf []byte - for fi := range ch { - var err error - buf, err = r.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID) - if err != nil { - return errors.Wrapf(err, "unable to load index %s", fi.ID.Str()) - } - idx, _, err := DecodeIndex(buf, fi.ID) - if err != nil { - return errors.Wrapf(err, "unable to decode index %s", fi.ID.Str()) - } - - select { - case indexCh <- idx: - case <-ctx.Done(): - } - } - - return nil - } - - // run workers on ch - wg.Go(func() error { - defer close(indexCh) - return RunWorkers(loadIndexParallelism, worker) - }) - - // receive decoded indexes validIndex := restic.NewIDSet() - wg.Go(func() error { - for idx := range indexCh { - ids, err := idx.IDs() - if err == nil { - for _, id := range ids { - validIndex.Insert(id) - } - } - - r.idx.Insert(idx) + err := ForAllIndexes(ctx, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error { + if err != nil { + return err } - r.idx.MergeFinalIndexes() + + ids, err := idx.IDs() + if err != nil { + return err + } + + for _, id := range ids { + validIndex.Insert(id) + } + r.idx.Insert(idx) return nil }) - err := wg.Wait() if err != nil { return errors.Fatal(err.Error()) } + r.idx.MergeFinalIndexes() + // remove index files from the cache which have been removed in the repo return r.PrepareCache(validIndex) } From a12c5f1d3725a52afe003a7d22904008b6fb3cf7 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 18:53:59 +0100 Subject: [PATCH 4/5] repository: move otherwise unused LoadIndex to tests --- internal/repository/repository.go | 14 -------------- internal/repository/repository_test.go | 20 ++++++++++++++++++-- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index c534fb565..a0d27cc3f 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -595,20 +595,6 @@ func (r *Repository) PrepareCache(indexIDs restic.IDSet) error { return nil } -// LoadIndex loads the index id from backend and returns it. -func LoadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*Index, error) { - buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id) - if err != nil { - return nil, err - } - - idx, oldFormat, err := DecodeIndex(buf, id) - if oldFormat { - fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str()) - } - return idx, err -} - // SearchKey finds a key with the supplied password, afterwards the config is // read and parsed. It tries at most maxKeys key files in the repo. func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int, keyHint string) error { diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 1e485320d..22a424556 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "crypto/sha256" + "fmt" "io" "math/rand" + "os" "path/filepath" "testing" "time" @@ -293,6 +295,20 @@ func TestRepositoryLoadIndex(t *testing.T) { rtest.OK(t, repo.LoadIndex(context.TODO())) } +// loadIndex loads the index id from backend and returns it. +func loadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*repository.Index, error) { + buf, err := repo.LoadAndDecrypt(ctx, nil, restic.IndexFile, id) + if err != nil { + return nil, err + } + + idx, oldFormat, err := repository.DecodeIndex(buf, id) + if oldFormat { + fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str()) + } + return idx, err +} + func BenchmarkLoadIndex(b *testing.B) { repository.TestUseLowSecurityKDFParameters(b) @@ -323,7 +339,7 @@ func BenchmarkLoadIndex(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := repository.LoadIndex(context.TODO(), repo, id) + _, err := loadIndex(context.TODO(), repo, id) rtest.OK(b, err) } } @@ -373,7 +389,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { packEntries := make(map[restic.ID]map[restic.ID]struct{}) err := repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { - idx, err := repository.LoadIndex(context.TODO(), repo, id) + idx, err := loadIndex(context.TODO(), repo, id) rtest.OK(t, err) for pb := range idx.Each(context.TODO()) { From b9f5d3fe1361798d31c63c310598110187b67697 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 18 Dec 2020 20:25:30 +0100 Subject: [PATCH 5/5] repository: Add test for ForAllIndexes --- internal/repository/index_parallel_test.go | 46 ++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 internal/repository/index_parallel_test.go diff --git a/internal/repository/index_parallel_test.go b/internal/repository/index_parallel_test.go new file mode 100644 index 000000000..0202be05c --- /dev/null +++ b/internal/repository/index_parallel_test.go @@ -0,0 +1,46 @@ +package repository_test + +import ( + "context" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + rtest "github.com/restic/restic/internal/test" +) + +func TestRepositoryForAllIndexes(t *testing.T) { + repodir, cleanup := rtest.Env(t, repoFixture) + defer cleanup() + + repo := repository.TestOpenLocal(t, repodir) + + expectedIndexIDs := restic.NewIDSet() + rtest.OK(t, repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { + expectedIndexIDs.Insert(id) + return nil + })) + + // check that all expected indexes are loaded without errors + indexIDs := restic.NewIDSet() + var indexErr error + rtest.OK(t, repository.ForAllIndexes(context.TODO(), repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error { + if err != nil { + indexErr = err + } + indexIDs.Insert(id) + return nil + })) + rtest.OK(t, indexErr) + rtest.Equals(t, expectedIndexIDs, indexIDs) + + // must failed with the returned error + iterErr := errors.New("error to pass upwards") + + err := repository.ForAllIndexes(context.TODO(), repo, func(id restic.ID, index *repository.Index, oldFormat bool, err error) error { + return iterErr + }) + + rtest.Equals(t, iterErr, err) +}