diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index dbf1faa21..247d922de 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -239,17 +239,17 @@ func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { // SaveDir stores a directory in the repo and returns the node. snPath is the // path within the current snapshot. -func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree, complete CompleteFunc) (d FutureTree, err error) { +func (arch *Archiver) SaveDir(ctx context.Context, snPath string, dir string, fi os.FileInfo, previous *restic.Tree, complete CompleteFunc) (d FutureNode, err error) { debug.Log("%v %v", snPath, dir) treeNode, err := arch.nodeFromFileInfo(dir, fi) if err != nil { - return FutureTree{}, err + return FutureNode{}, err } names, err := readdirnames(arch.FS, dir, fs.O_NOFOLLOW) if err != nil { - return FutureTree{}, err + return FutureNode{}, err } sort.Strings(names) @@ -259,7 +259,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo // test if context has been cancelled if ctx.Err() != nil { debug.Log("context has been cancelled, aborting") - return FutureTree{}, ctx.Err() + return FutureNode{}, ctx.Err() } pathname := arch.FS.Join(dir, name) @@ -275,7 +275,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo continue } - return FutureTree{}, err + return FutureNode{}, err } if excluded { @@ -285,50 +285,58 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo nodes = append(nodes, fn) } - ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes, complete) + fn := arch.treeSaver.Save(ctx, snPath, dir, treeNode, nodes, complete) - return ft, nil + return fn, nil } -// FutureNode holds a reference to a node, FutureFile, or FutureTree. +// FutureNode holds a reference to a channel that returns a FutureNodeResult +// or a reference to an already existing result. If the result is available +// immediatelly, then storing a reference directly requires less memory than +// using the indirection via a channel. type FutureNode struct { + ch <-chan futureNodeResult + res *futureNodeResult +} + +type futureNodeResult struct { snPath, target string node *restic.Node stats ItemStats err error - - isFile bool - file FutureFile - isTree bool - tree FutureTree } -func (fn *FutureNode) wait(ctx context.Context) { - switch { - case fn.isFile: - // wait for and collect the data for the file - fn.file.Wait(ctx) - fn.node = fn.file.Node() - fn.err = fn.file.Err() - fn.stats = fn.file.Stats() +func newFutureNode() (FutureNode, chan<- futureNodeResult) { + ch := make(chan futureNodeResult, 1) + return FutureNode{ch: ch}, ch +} - // ensure the other stuff can be garbage-collected - fn.file = FutureFile{} - fn.isFile = false - - case fn.isTree: - // wait for and collect the data for the dir - fn.tree.Wait(ctx) - fn.node = fn.tree.Node() - fn.stats = fn.tree.Stats() - - // ensure the other stuff can be garbage-collected - fn.tree = FutureTree{} - fn.isTree = false +func newFutureNodeWithResult(res futureNodeResult) FutureNode { + return FutureNode{ + res: &res, } } +func (fn *FutureNode) take(ctx context.Context) futureNodeResult { + if fn.res != nil { + res := fn.res + // free result + fn.res = nil + return *res + } + select { + case res, ok := <-fn.ch: + if ok { + // free channel + fn.ch = nil + return res + } + case <-ctx.Done(): + } + return futureNodeResult{} +} + // allBlobsPresent checks if all blobs (contents) of the given node are // present in the index. func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { @@ -351,11 +359,6 @@ func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { start := time.Now() - fn = FutureNode{ - snPath: snPath, - target: target, - } - debug.Log("%v target %q, previous %v", snPath, target, previous) abstarget, err := arch.FS.Abs(target) if err != nil { @@ -395,14 +398,19 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous debug.Log("%v hasn't changed, using old list of blobs", target) arch.CompleteItem(snPath, previous, previous, ItemStats{}, time.Since(start)) arch.CompleteBlob(snPath, previous.Size) - fn.node, err = arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(target, fi) if err != nil { return FutureNode{}, false, err } // copy list of blobs - fn.node.Content = previous.Content + node.Content = previous.Content + fn = newFutureNodeWithResult(futureNodeResult{ + snPath: snPath, + target: target, + node: node, + }) return fn, false, nil } @@ -449,9 +457,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous return FutureNode{}, true, nil } - fn.isFile = true // Save will close the file, we don't need to do that - fn.file = arch.fileSaver.Save(ctx, snPath, file, fi, func() { + fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() { arch.StartFile(snPath) }, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) @@ -470,8 +477,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous return FutureNode{}, false, err } - fn.isTree = true - fn.tree, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree, + fn, err = arch.SaveDir(ctx, snPath, target, fi, oldSubtree, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snItem, previous, node, stats, time.Since(start)) }) @@ -487,10 +493,15 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous default: debug.Log(" %v other", target) - fn.node, err = arch.nodeFromFileInfo(target, fi) + node, err := arch.nodeFromFileInfo(target, fi) if err != nil { return FutureNode{}, false, err } + fn = newFutureNodeWithResult(futureNodeResult{ + snPath: snPath, + target: target, + node: node, + }) } debug.Log("return after %.3f", time.Since(start).Seconds()) @@ -647,28 +658,28 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, // process all futures for name, fn := range futureNodes { - fn.wait(ctx) + fnr := fn.take(ctx) // return the error, or ignore it - if fn.err != nil { - fn.err = arch.error(fn.target, fn.err) - if fn.err == nil { + if fnr.err != nil { + fnr.err = arch.error(fnr.target, fnr.err) + if fnr.err == nil { // ignore error continue } - return nil, fn.err + return nil, fnr.err } // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) + if fnr.node == nil { + debug.Log("%v excluded: %v", fnr.snPath, fnr.target) continue } - fn.node.Name = name + fnr.node.Name = name - err := tree.Insert(fn.node) + err := tree.Insert(fnr.node) if err != nil { return nil, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index a5ba9262c..a6485234f 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -80,11 +80,11 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) + res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete) - res.Wait(ctx) - if res.Err() != nil { - t.Fatal(res.Err()) + fnr := res.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } arch.stopWorkers() @@ -109,15 +109,15 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Errorf("no node returned for complete callback") } - if completeCallbackNode != nil && !res.Node().Equals(*completeCallbackNode) { + if completeCallbackNode != nil && !fnr.node.Equals(*completeCallbackNode) { t.Errorf("different node returned for complete callback") } - if completeCallbackStats != res.Stats() { - t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", res.Stats(), completeCallbackStats) + if completeCallbackStats != fnr.stats { + t.Errorf("different stats return for complete callback, want:\n %v\ngot:\n %v", fnr.stats, completeCallbackStats) } - return res.Node(), res.Stats() + return fnr.node, fnr.stats } func TestArchiverSaveFile(t *testing.T) { @@ -232,16 +232,16 @@ func TestArchiverSave(t *testing.T) { t.Errorf("Save() excluded the node, that's unexpected") } - node.wait(ctx) - if node.err != nil { - t.Fatal(node.err) + fnr := node.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } - if node.node == nil { + if fnr.node == nil { t.Fatalf("returned node is nil") } - stats := node.stats + stats := fnr.stats arch.stopWorkers() err = repo.Flush(ctx) @@ -249,7 +249,7 @@ func TestArchiverSave(t *testing.T) { t.Fatal(err) } - TestEnsureFileContent(ctx, t, repo, "file", node.node, testfile) + TestEnsureFileContent(ctx, t, repo, "file", fnr.node, testfile) if stats.DataSize != uint64(len(testfile.Content)) { t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(testfile.Content), stats.DataSize) } @@ -311,16 +311,16 @@ func TestArchiverSaveReaderFS(t *testing.T) { t.Errorf("Save() excluded the node, that's unexpected") } - node.wait(ctx) - if node.err != nil { - t.Fatal(node.err) + fnr := node.take(ctx) + if fnr.err != nil { + t.Fatal(fnr.err) } - if node.node == nil { + if fnr.node == nil { t.Fatalf("returned node is nil") } - stats := node.stats + stats := fnr.stats arch.stopWorkers() err = repo.Flush(ctx) @@ -328,7 +328,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { t.Fatal(err) } - TestEnsureFileContent(ctx, t, repo, "file", node.node, TestFile{Content: test.Data}) + TestEnsureFileContent(ctx, t, repo, "file", fnr.node, TestFile{Content: test.Data}) if stats.DataSize != uint64(len(test.Data)) { t.Errorf("wrong stats returned in DataSize, want %d, got %d", len(test.Data), stats.DataSize) } @@ -851,13 +851,13 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil, nil) + ft, err := arch.SaveDir(ctx, "/", test.target, fi, nil, nil) if err != nil { t.Fatal(err) } - ft.Wait(ctx) - node, stats := ft.Node(), ft.Stats() + fnr := ft.take(ctx) + node, stats := fnr.node, fnr.stats t.Logf("stats: %v", stats) if stats.DataSize != 0 { @@ -928,13 +928,13 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil, nil) + ft, err := arch.SaveDir(ctx, "/", tempdir, fi, nil, nil) if err != nil { t.Fatal(err) } - ft.Wait(ctx) - node, stats := ft.Node(), ft.Stats() + fnr := ft.take(ctx) + node, stats := fnr.node, fnr.stats if err != nil { t.Fatal(err) diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 0ec871e8b..1e6eea979 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -13,41 +13,6 @@ import ( "golang.org/x/sync/errgroup" ) -// FutureFile is returned by Save and will return the data once it -// has been processed. -type FutureFile struct { - ch <-chan saveFileResponse - res saveFileResponse -} - -// Wait blocks until the result of the save operation is received or ctx is -// cancelled. -func (s *FutureFile) Wait(ctx context.Context) { - select { - case res, ok := <-s.ch: - if ok { - s.res = res - } - case <-ctx.Done(): - return - } -} - -// Node returns the node once it is available. -func (s *FutureFile) Node() *restic.Node { - return s.res.node -} - -// Stats returns the stats for the file once they are available. -func (s *FutureFile) Stats() ItemStats { - return s.res.stats -} - -// Err returns the error in case an error occurred. -func (s *FutureFile) Err() error { - return s.res.err -} - // SaveBlobFn saves a blob to a repo. type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob @@ -102,10 +67,11 @@ type CompleteFunc func(*restic.Node, ItemStats) // Save stores the file f and returns the data once it has been completed. The // file is closed by Save. -func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { - ch := make(chan saveFileResponse, 1) +func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode { + fn, ch := newFutureNode() job := saveFileJob{ snPath: snPath, + target: target, file: file, fi: fi, start: start, @@ -121,41 +87,42 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os close(ch) } - return FutureFile{ch: ch} + return fn } type saveFileJob struct { snPath string + target string file fs.File fi os.FileInfo - ch chan<- saveFileResponse + ch chan<- futureNodeResult complete CompleteFunc start func() } -type saveFileResponse struct { - node *restic.Node - stats ItemStats - err error -} - // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult { start() stats := ItemStats{} + fnr := futureNodeResult{ + snPath: snPath, + target: target, + } debug.Log("%v", snPath) node, err := s.NodeFromFileInfo(f.Name(), fi) if err != nil { _ = f.Close() - return saveFileResponse{err: err} + fnr.err = err + return fnr } if node.Type != "file" { _ = f.Close() - return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)} + fnr.err = errors.Errorf("node type %q is wrong", node.Type) + return fnr } // reuse the chunker @@ -179,13 +146,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat if err != nil { _ = f.Close() - return saveFileResponse{err: err} + fnr.err = err + return fnr } // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - return saveFileResponse{err: ctx.Err()} + fnr.err = ctx.Err() + return fnr } res := s.saveBlob(ctx, restic.DataBlob, buf) @@ -194,7 +163,8 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - return saveFileResponse{err: ctx.Err()} + fnr.err = ctx.Err() + return fnr } s.CompleteBlob(f.Name(), uint64(len(chunk.Data))) @@ -202,7 +172,8 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat err = f.Close() if err != nil { - return saveFileResponse{err: err} + fnr.err = err + return fnr } for _, res := range results { @@ -217,11 +188,9 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } node.Size = size - - return saveFileResponse{ - node: node, - stats: stats, - } + fnr.node = node + fnr.stats = stats + return fnr } func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { @@ -239,7 +208,8 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { return } } - res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start) + + res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start) if job.complete != nil { job.complete(res.node, res.stats) } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index 497882fcb..0bdb8ad50 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -64,7 +64,7 @@ func TestFileSaver(t *testing.T) { testFs := fs.Local{} s, ctx, wg := startFileSaver(ctx, t) - var results []FutureFile + var results []FutureNode for _, filename := range files { f, err := testFs.Open(filename) @@ -77,14 +77,14 @@ func TestFileSaver(t *testing.T) { t.Fatal(err) } - ff := s.Save(ctx, filename, f, fi, startFn, completeFn) + ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn) results = append(results, ff) } for _, file := range results { - file.Wait(ctx) - if file.Err() != nil { - t.Errorf("unable to save file: %v", file.Err()) + fnr := file.take(ctx) + if fnr.err != nil { + t.Errorf("unable to save file: %v", fnr.err) } } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 4ed033fac..221df85e1 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -8,35 +8,6 @@ import ( "golang.org/x/sync/errgroup" ) -// FutureTree is returned by Save and will return the data once it -// has been processed. -type FutureTree struct { - ch <-chan saveTreeResponse - res saveTreeResponse -} - -// Wait blocks until the data has been received or ctx is cancelled. -func (s *FutureTree) Wait(ctx context.Context) { - select { - case <-ctx.Done(): - return - case res, ok := <-s.ch: - if ok { - s.res = res - } - } -} - -// Node returns the node. -func (s *FutureTree) Node() *restic.Node { - return s.res.node -} - -// Stats returns the stats for the file. -func (s *FutureTree) Stats() ItemStats { - return s.res.stats -} - // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) @@ -70,10 +41,11 @@ func (s *TreeSaver) TriggerShutdown() { } // Save stores the dir d and returns the data once it has been completed. -func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureTree { - ch := make(chan saveTreeResponse, 1) +func (s *TreeSaver) Save(ctx context.Context, snPath string, target string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureNode { + fn, ch := newFutureNode() job := saveTreeJob{ snPath: snPath, + target: target, node: node, nodes: nodes, ch: ch, @@ -86,51 +58,53 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, close(ch) } - return FutureTree{ch: ch} + return fn } type saveTreeJob struct { snPath string - nodes []FutureNode + target string node *restic.Node - ch chan<- saveTreeResponse + nodes []FutureNode + ch chan<- futureNodeResult complete CompleteFunc } -type saveTreeResponse struct { - node *restic.Node - stats ItemStats -} - // save stores the nodes as a tree in the repo. -func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) { +func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, ItemStats, error) { var stats ItemStats + node := job.node + nodes := job.nodes + // allow GC of nodes array once the loop is finished + job.nodes = nil tree := restic.NewTree(len(nodes)) - for _, fn := range nodes { - fn.wait(ctx) + for i, fn := range nodes { + // fn is a copy, so clear the original value explicitly + nodes[i] = FutureNode{} + fnr := fn.take(ctx) // return the error if it wasn't ignored - if fn.err != nil { - debug.Log("err for %v: %v", fn.snPath, fn.err) - fn.err = s.errFn(fn.target, fn.err) - if fn.err == nil { + if fnr.err != nil { + debug.Log("err for %v: %v", fnr.snPath, fnr.err) + fnr.err = s.errFn(fnr.target, fnr.err) + if fnr.err == nil { // ignore error continue } - return nil, stats, fn.err + return nil, stats, fnr.err } // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) + if fnr.node == nil { + debug.Log("%v excluded: %v", fnr.snPath, fnr.target) continue } - debug.Log("insert %v", fn.node.Name) - err := tree.Insert(fn.node) + debug.Log("insert %v", fnr.node.Name) + err := tree.Insert(fnr.node) if err != nil { return nil, stats, err } @@ -158,7 +132,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { return nil } } - node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) + + node, stats, err := s.save(ctx, &job) if err != nil { debug.Log("error saving tree blob: %v", err) close(job.ch) @@ -168,9 +143,11 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { if job.complete != nil { job.complete(node, stats) } - job.ch <- saveTreeResponse{ - node: node, - stats: stats, + job.ch <- futureNodeResult{ + snPath: job.snPath, + target: job.target, + node: node, + stats: stats, } close(job.ch) } diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index e7314e8f8..7a152ff0c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -28,19 +28,19 @@ func TestTreeSaver(t *testing.T) { b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) - var results []FutureTree + var results []FutureNode for i := 0; i < 20; i++ { node := &restic.Node{ Name: fmt.Sprintf("file-%d", i), } - fb := b.Save(ctx, "/", node, nil, nil) + fb := b.Save(ctx, "/", node.Name, node, nil, nil) results = append(results, fb) } for _, tree := range results { - tree.Wait(ctx) + tree.take(ctx) } b.TriggerShutdown() @@ -89,19 +89,19 @@ func TestTreeSaverError(t *testing.T) { b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) - var results []FutureTree + var results []FutureNode for i := 0; i < test.trees; i++ { node := &restic.Node{ Name: fmt.Sprintf("file-%d", i), } - fb := b.Save(ctx, "/", node, nil, nil) + fb := b.Save(ctx, "/", node.Name, node, nil, nil) results = append(results, fb) } for _, tree := range results { - tree.Wait(ctx) + tree.take(ctx) } b.TriggerShutdown()