From 1d7bb01a6b606d7d44627f5fcf2d3d33a7976f1f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 00:23:45 +0100 Subject: [PATCH] check: Cleanup tree loading and switch to use errgroup The helper methods are now wired up in the Structure method. --- internal/checker/checker.go | 145 ++++++++++++++---------------------- 1 file changed, 54 insertions(+), 91 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 23564ea45..9c6922a36 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -316,115 +316,56 @@ type treeJob struct { // loadTreeWorker loads trees from repo and sends them to out. func loadTreeWorker(ctx context.Context, repo restic.Repository, - in <-chan restic.ID, out chan<- treeJob, - wg *sync.WaitGroup) { + in <-chan restic.ID, out chan<- treeJob) { - defer func() { - debug.Log("exiting") - wg.Done() - }() + for treeID := range in { + tree, err := repo.LoadTree(ctx, treeID) + debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) + job := treeJob{ID: treeID, error: err, Tree: tree} - var ( - inCh = in - outCh = out - job treeJob - ) - - outCh = nil - for { select { case <-ctx.Done(): return - - case treeID, ok := <-inCh: - if !ok { - return - } - debug.Log("load tree %v", treeID) - - tree, err := repo.LoadTree(ctx, treeID) - debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) - job = treeJob{ID: treeID, error: err, Tree: tree} - outCh = out - inCh = nil - - case outCh <- job: - debug.Log("sent tree %v", job.ID) - outCh = nil - inCh = in + case out <- job: } } } // checkTreeWorker checks the trees received and sends out errors to errChan. -func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error, wg *sync.WaitGroup) { - defer func() { - debug.Log("exiting") - wg.Done() - }() +func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error) { + for job := range in { + debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) - var ( - inCh = in - outCh = out - treeError TreeError - ) + var errs []error + if job.error != nil { + errs = append(errs, job.error) + } else { + errs = c.checkTree(job.ID, job.Tree) + } - outCh = nil - for { + if len(errs) == 0 { + continue + } + treeError := TreeError{ID: job.ID, Errors: errs} select { case <-ctx.Done(): - debug.Log("done channel closed, exiting") return - - case job, ok := <-inCh: - if !ok { - debug.Log("input channel closed, exiting") - return - } - - debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) - - var errs []error - if job.error != nil { - errs = append(errs, job.error) - } else { - errs = c.checkTree(job.ID, job.Tree) - } - - if len(errs) > 0 { - debug.Log("checked tree %v: %v errors", job.ID, len(errs)) - treeError = TreeError{ID: job.ID, Errors: errs} - outCh = out - inCh = nil - } - - case outCh <- treeError: + case out <- treeError: debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors)) - outCh = nil - inCh = in } } } func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) { - defer func() { - debug.Log("closing output channels") - close(loaderChan) - close(out) - }() - var ( inCh = in - outCh = out - loadCh = loaderChan + outCh chan<- treeJob + loadCh chan<- restic.ID job treeJob nextTreeID restic.ID outstandingLoadTreeJobs = 0 ) - outCh = nil - loadCh = nil - for { if loadCh == nil && len(backlog) > 0 { // process last added ids first, that is traverse the tree in depth-first order @@ -528,8 +469,6 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids resti // subtrees are available in the index. errChan is closed after all trees have // been traversed. func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { - defer close(errChan) - trees, errs := loadSnapshotTreeIDs(ctx, c.repo) debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) @@ -541,18 +480,42 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { } } - treeIDChan := make(chan restic.ID) - treeJobChan1 := make(chan treeJob) - treeJobChan2 := make(chan treeJob) + loaderChan := make(chan restic.ID) + loadedTreeChan := make(chan treeJob) + treeStream := make(chan treeJob) + + wg, ctx := errgroup.WithContext(ctx) + var loadTreeWg sync.WaitGroup - var wg sync.WaitGroup for i := 0; i < defaultParallelism; i++ { - wg.Add(2) - go loadTreeWorker(ctx, c.repo, treeIDChan, treeJobChan1, &wg) - go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg) + loadTreeWg.Add(1) + wg.Go(func() error { + defer loadTreeWg.Done() + loadTreeWorker(ctx, c.repo, loaderChan, loadedTreeChan) + return nil + }) + } + // close once all loadTreeWorkers have completed + wg.Go(func() error { + loadTreeWg.Wait() + close(loadedTreeChan) + return nil + }) + + defer close(errChan) + for i := 0; i < defaultParallelism; i++ { + wg.Go(func() error { + c.checkTreeWorker(ctx, treeStream, errChan) + return nil + }) } - c.filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2) + wg.Go(func() error { + defer close(loaderChan) + defer close(treeStream) + c.filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream) + return nil + }) wg.Wait() }