From 7e6174126f5d01547cee13a2cdae902e837298c6 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 12 Jul 2015 00:25:42 +0200 Subject: [PATCH] checker: run Packs() in parallel --- checker/checker.go | 103 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 20 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index c449bde9e..aff7c1ef7 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "errors" "fmt" + "sync" "github.com/restic/restic" "github.com/restic/restic/backend" @@ -63,7 +64,7 @@ func New(repo *repository.Repository) *Checker { } } -const loadIndexParallelism = 20 +const defaultParallelism = 20 // LoadIndex loads all index files. func (c *Checker) LoadIndex() error { @@ -94,7 +95,7 @@ func (c *Checker) LoadIndex() error { go func() { defer close(indexCh) debug.Log("LoadIndex", "start loading indexes in parallel") - perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, loadIndexParallelism, worker) + perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism, worker) debug.Log("LoadIndex", "loading indexes finished, error: %v", perr) }() @@ -140,33 +141,95 @@ func (e PackError) Error() string { return "pack " + e.ID.String() + ": " + e.error.Error() } +func packIDTester(repo *repository.Repository, inChan <-chan mapID, errChan chan<- error, wg *sync.WaitGroup, done <-chan struct{}) { + debug.Log("Checker.testPackID", "worker start") + defer debug.Log("Checker.testPackID", "worker done") + + defer wg.Done() + + for id := range inChan { + ok, err := repo.Backend().Test(backend.Data, map2str(id)) + if err != nil { + err = PackError{map2id(id), err} + } else { + if !ok { + err = PackError{map2id(id), errors.New("does not exist")} + } + } + + if err != nil { + debug.Log("Checker.testPackID", "error checking for pack %s: %v", map2id(id).Str(), err) + select { + case <-done: + return + case errChan <- err: + } + + continue + } + + debug.Log("Checker.testPackID", "pack %s exists", map2id(id).Str()) + } +} + +func collectErrors(in <-chan error, out chan<- []error, done <-chan struct{}) { + var errs []error + +outer: + for { + select { + case err, ok := <-in: + if !ok { + break outer + } + errs = append(errs, err) + case <-done: + break outer + } + } + + out <- errs +} + // Packs checks that all packs referenced in the index are still available and // there are no packs that aren't in an index. -func (c *Checker) Packs() (errs []error) { +func (c *Checker) Packs() []error { debug.Log("Checker.Packs", "checking for %d packs", len(c.packs)) seenPacks := make(map[mapID]struct{}) - for id := range c.packs { - seenPacks[id] = struct{}{} - ok, err := c.repo.Backend().Test(backend.Data, map2str(id)) - if err != nil { - debug.Log("Checker.Packs", "error checking for pack %s", map2id(id).Str()) - errs = append(errs, PackError{map2id(id), err}) - continue - } - - if !ok { - debug.Log("Checker.Packs", "pack %s does not exist", map2id(id).Str()) - errs = append(errs, PackError{map2id(id), errors.New("does not exist")}) - continue - } - debug.Log("Checker.Packs", "pack %s exists", map2id(id).Str()) - } - done := make(chan struct{}) defer close(done) + var workerWG sync.WaitGroup + + IDChan := make(chan mapID) + errChan := make(chan error) + + for i := 0; i < defaultParallelism; i++ { + workerWG.Add(1) + go packIDTester(c.repo, IDChan, errChan, &workerWG, done) + } + + errsChan := make(chan []error, 1) + + go collectErrors(errChan, errsChan, done) + + for id := range c.packs { + seenPacks[id] = struct{}{} + IDChan <- id + } + close(IDChan) + + debug.Log("Checker.Packs", "waiting for %d workers to terminate", defaultParallelism) + workerWG.Wait() + debug.Log("Checker.Packs", "workers terminated") + close(errChan) + + errs := <-errsChan + debug.Log("Checker.Packs", "error worker terminated") + for id := range c.repo.List(backend.Data, done) { + debug.Log("Checker.Packs", "check data blob %v", id) if _, ok := seenPacks[id2map(id)]; !ok { errs = append(errs, PackError{id, errors.New("not referenced in any index")}) }