diff --git a/archiver.go b/archiver.go index 285ba25c2..f525eef11 100644 --- a/archiver.go +++ b/archiver.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "sort" "sync" "github.com/juju/arrar" @@ -415,134 +416,201 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (Blob, error) { return blob, nil } -func (arch *Archiver) Snapshot(p *Progress, path string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { +func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) { + defer wg.Done() + for { + select { + case e, ok := <-entCh: + if !ok { + // channel is closed + return + } + + node, err := NodeFromFileInfo(e.Path, e.Info) + if err != nil { + panic(err) + } + + if node.Type == "file" { + node.blobs, err = arch.SaveFile(p, node) + if err != nil { + panic(err) + } + } + + e.Result <- node + p.Report(Stat{Files: 1}) + case <-done: + // pipeline was cancelled + return + } + } +} + +func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) { + defer wg.Done() + for { + select { + case dir, ok := <-dirCh: + if !ok { + // channel is closed + return + } + debug.Log("Archiver.DirWorker", "save dir %v\n", dir.Path) + + tree := NewTree() + + // wait for all content + for _, ch := range dir.Entries { + node := (<-ch).(*Node) + tree.Insert(node) + + if node.Type == "dir" { + debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs) + } + + for _, blob := range node.blobs { + tree.Map.Insert(blob) + arch.m.Insert(blob) + } + } + + node, err := NodeFromFileInfo(dir.Path, dir.Info) + if err != nil { + node.Error = err.Error() + dir.Result <- node + continue + } + + blob, err := arch.SaveTreeJSON(tree) + if err != nil { + panic(err) + } + debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob) + + node.Subtree = blob.ID + node.blobs = Blobs{blob} + + dir.Result <- node + p.Report(Stat{Dirs: 1}) + case <-done: + // pipeline was cancelled + return + } + } +} + +func compareWithOldTree(newCh <-chan interface{}, oldCh <-chan WalkTreeJob, outCh chan<- interface{}) { + debug.Log("Archiver.compareWithOldTree", "start") + defer func() { + debug.Log("Archiver.compareWithOldTree", "done") + }() + for { + debug.Log("Archiver.compareWithOldTree", "waiting for new job") + newJob, ok := <-newCh + if !ok { + // channel is closed + return + } + + debug.Log("Archiver.compareWithOldTree", "received new job %v", newJob) + oldJob, ok := <-oldCh + if !ok { + // channel is closed + return + } + + debug.Log("Archiver.compareWithOldTree", "received old job %v", oldJob) + + outCh <- newJob + } +} + +func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { + debug.Log("Archiver.Snapshot", "start for %v", paths) + debug.Break("Archiver.Snapshot") + sort.Strings(paths) p.Start() defer p.Done() - sn, err := NewSnapshot(path) + sn, err := NewSnapshot(paths) if err != nil { return nil, nil, err } - sn.Parent = parentSnapshot + // load parent snapshot + // var oldRoot backend.ID + // if parentSnapshot != nil { + // sn.Parent = parentSnapshot + // parentSn, err := LoadSnapshot(arch.s, parentSnapshot) + // if err != nil { + // return nil, nil, err + // } + // oldRoot = parentSn.Tree.Storage + // } + // signal the whole pipeline to stop done := make(chan struct{}) - entCh := make(chan pipe.Entry) - dirCh := make(chan pipe.Dir) - fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry) { - defer wg.Done() - for { - select { - case e, ok := <-entCh: - if !ok { - // channel is closed - return - } - - node, err := NodeFromFileInfo(e.Path, e.Info) - if err != nil { - panic(err) - } - - if node.Type == "file" { - node.blobs, err = arch.SaveFile(p, node) - if err != nil { - panic(err) - } - } - - e.Result <- node - p.Report(Stat{Files: 1}) - case <-done: - // pipeline was cancelled - return - } - } - } - - dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, dirCh <-chan pipe.Dir) { - defer wg.Done() - for { - select { - case dir, ok := <-dirCh: - if !ok { - // channel is closed - return - } - - tree := NewTree() - - // wait for all content - for _, ch := range dir.Entries { - node := (<-ch).(*Node) - tree.Insert(node) - - if node.Type == "dir" { - debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs) - } - - for _, blob := range node.blobs { - tree.Map.Insert(blob) - arch.m.Insert(blob) - } - } - - node, err := NodeFromFileInfo(dir.Path, dir.Info) - if err != nil { - node.Error = err.Error() - dir.Result <- node - continue - } - - blob, err := arch.SaveTreeJSON(tree) - if err != nil { - panic(err) - } - debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob) - - node.Subtree = blob.ID - node.blobs = Blobs{blob} - - dir.Result <- node - p.Report(Stat{Dirs: 1}) - case <-done: - // pipeline was cancelled - return - } - } - } + // if we have an old root, start walker and comparer + // oldTreeCh := make(chan WalkTreeJob) + // if oldRoot != nil { + // // start walking the old tree + // debug.Log("Archiver.Snapshot", "start comparer for old root %v", oldRoot.Str()) + // go WalkTree(arch.s, oldRoot, done, oldTreeCh) + // } var wg sync.WaitGroup + entCh := make(chan pipe.Entry) + dirCh := make(chan pipe.Dir) + jobsCh := make(chan interface{}) + + // split + wg.Add(1) + go func() { + pipe.Split(jobsCh, dirCh, entCh) + close(dirCh) + close(entCh) + wg.Done() + }() + + // run workers for i := 0; i < maxConcurrency; i++ { wg.Add(2) - go fileWorker(&wg, done, entCh) - go dirWorker(&wg, done, dirCh) + go arch.fileWorker(&wg, p, done, entCh) + go arch.dirWorker(&wg, p, done, dirCh) } - resCh, err := pipe.Walk(path, done, entCh, dirCh) + // start walker + resCh, err := pipe.Walk(paths, done, jobsCh) if err != nil { close(done) + + debug.Log("Archiver.Snapshot", "pipe.Walke returned error %v", err) + return nil, nil, err } // wait for all workers to terminate + debug.Log("Archiver.Snapshot", "wait for workers") wg.Wait() - if err != nil { - return nil, nil, err - } + debug.Log("Archiver.Snapshot", "workers terminated") - // wait for top-level node - node := (<-resCh).(*Node) - - // add tree for top-level directory + // add the top-level tree tree := NewTree() - tree.Insert(node) - for _, blob := range node.blobs { - blob = arch.m.Insert(blob) - tree.Map.Insert(blob) + root := (<-resCh).(pipe.Dir) + for i := 0; i < len(paths); i++ { + node := (<-root.Entries[i]).(*Node) + + debug.Log("Archiver.Snapshot", "got toplevel node %v", node) + + tree.Insert(node) + for _, blob := range node.blobs { + blob = arch.m.Insert(blob) + tree.Map.Insert(blob) + } } tb, err := arch.SaveTreeJSON(tree) @@ -565,27 +633,33 @@ func isFile(fi os.FileInfo) bool { return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 } -func Scan(dir string, p *Progress) (Stat, error) { +func Scan(dirs []string, p *Progress) (Stat, error) { p.Start() defer p.Done() var stat Stat - err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error { - s := Stat{} - if isFile(fi) { - s.Files++ - s.Bytes += uint64(fi.Size()) - } else if fi.IsDir() { - s.Dirs++ + for _, dir := range dirs { + err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error { + s := Stat{} + if isFile(fi) { + s.Files++ + s.Bytes += uint64(fi.Size()) + } else if fi.IsDir() { + s.Dirs++ + } + + p.Report(s) + stat.Add(s) + + // TODO: handle error? + return nil + }) + + if err != nil { + return Stat{}, err } + } - p.Report(s) - stat.Add(s) - - // TODO: handle error? - return nil - }) - - return stat, err + return stat, nil } diff --git a/archiver_test.go b/archiver_test.go index 81f2b80df..1adbb1c61 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -138,7 +138,7 @@ func BenchmarkArchiveDirectory(b *testing.B) { arch, err := restic.NewArchiver(server) ok(b, err) - _, id, err := arch.Snapshot(nil, *benchArchiveDirectory, nil) + _, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) b.Logf("snapshot archived as %v", id) } @@ -147,7 +147,7 @@ func snapshot(t testing.TB, server restic.Server, path string) *restic.Snapshot arch, err := restic.NewArchiver(server) ok(t, err) ok(t, arch.Preload(nil)) - sn, _, err := arch.Snapshot(nil, path, nil) + sn, _, err := arch.Snapshot(nil, []string{path}, nil) ok(t, err) return sn } @@ -215,7 +215,7 @@ func BenchmarkPreload(t *testing.B) { // archive a few files arch, err := restic.NewArchiver(server) ok(t, err) - sn, _, err := arch.Snapshot(nil, *benchArchiveDirectory, nil) + sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) ok(t, err) t.Logf("archived snapshot %v", sn.ID()) @@ -243,7 +243,7 @@ func BenchmarkLoadTree(t *testing.B) { // archive a few files arch, err := restic.NewArchiver(server) ok(t, err) - sn, _, err := arch.Snapshot(nil, *benchArchiveDirectory, nil) + sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) ok(t, err) t.Logf("archived snapshot %v", sn.ID()) diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 9d89b154b..0921a8390 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -1,9 +1,9 @@ package main import ( - "errors" "fmt" "os" + "path/filepath" "strings" "time" @@ -12,7 +12,9 @@ import ( "golang.org/x/crypto/ssh/terminal" ) -type CmdBackup struct{} +type CmdBackup struct { + Parent string `short:"p" long:"parent" description:"use this parent snapshot (default: not set)"` +} func init() { _, err := parser.AddCommand("backup", @@ -167,10 +169,18 @@ func newArchiveProgress(todo restic.Stat) *restic.Progress { } func (cmd CmdBackup) Execute(args []string) error { - if len(args) == 0 || len(args) > 2 { + if len(args) == 0 { return fmt.Errorf("wrong number of parameters, Usage: %s", cmd.Usage()) } + target := make([]string, 0, len(args)) + for _, d := range args { + if a, err := filepath.Abs(d); err == nil { + d = a + } + target = append(target, d) + } + s, err := OpenRepo() if err != nil { return err @@ -178,17 +188,16 @@ func (cmd CmdBackup) Execute(args []string) error { var parentSnapshotID backend.ID - target := args[0] - if len(args) > 1 { - parentSnapshotID, err = s.FindSnapshot(args[1]) + if cmd.Parent != "" { + parentSnapshotID, err = s.FindSnapshot(cmd.Parent) if err != nil { - return fmt.Errorf("invalid id %q: %v", args[1], err) + return fmt.Errorf("invalid id %q: %v", cmd.Parent, err) } fmt.Printf("found parent snapshot %v\n", parentSnapshotID) } - fmt.Printf("scan %s\n", target) + fmt.Printf("scan %v\n", target) stat, err := restic.Scan(target, newScanProgress()) @@ -197,10 +206,6 @@ func (cmd CmdBackup) Execute(args []string) error { // return true // } - if parentSnapshotID != nil { - return errors.New("not implemented") - } - arch, err := restic.NewArchiver(s) if err != nil { fmt.Fprintf(os.Stderr, "err: %v\n", err) @@ -212,16 +217,16 @@ func (cmd CmdBackup) Execute(args []string) error { return nil } - fmt.Printf("loading blobs\n") - pb, err := newLoadBlobsProgress(s) - if err != nil { - return err - } + // fmt.Printf("loading blobs\n") + // pb, err := newLoadBlobsProgress(s) + // if err != nil { + // return err + // } - err = arch.Preload(pb) - if err != nil { - return err - } + // err = arch.Preload(pb) + // if err != nil { + // return err + // } _, id, err := arch.Snapshot(newArchiveProgress(stat), target, parentSnapshotID) if err != nil { diff --git a/cmd/restic/cmd_ls.go b/cmd/restic/cmd_ls.go index b6a98caba..dbfd5abb9 100644 --- a/cmd/restic/cmd_ls.go +++ b/cmd/restic/cmd_ls.go @@ -86,7 +86,7 @@ func (cmd CmdLs) Execute(args []string) error { return err } - fmt.Printf("snapshot of %s at %s:\n", sn.Dir, sn.Time) + fmt.Printf("snapshot of %v at %s:\n", sn.Paths, sn.Time) return print_tree("", s, sn.Tree) } diff --git a/cmd/restic/cmd_snapshots.go b/cmd/restic/cmd_snapshots.go index 5dbe2b75a..53aa81481 100644 --- a/cmd/restic/cmd_snapshots.go +++ b/cmd/restic/cmd_snapshots.go @@ -128,7 +128,16 @@ func (cmd CmdSnapshots) Execute(args []string) error { } for _, sn := range list { - tab.Rows = append(tab.Rows, []interface{}{sn.ID()[:plen], sn.Time.Format(TimeFormat), sn.Hostname, sn.Dir}) + if len(sn.Paths) == 0 { + continue + } + tab.Rows = append(tab.Rows, []interface{}{sn.ID()[:plen], sn.Time.Format(TimeFormat), sn.Hostname, sn.Paths[0]}) + + if len(sn.Paths) > 1 { + for _, path := range sn.Paths { + tab.Rows = append(tab.Rows, []interface{}{"", "", "", path}) + } + } } tab.Write(os.Stdout) diff --git a/pipe/pipe.go b/pipe/pipe.go index c70bfa363..257e41d1e 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -5,6 +5,8 @@ import ( "os" "path/filepath" "sort" + + "github.com/restic/restic/debug" ) type Entry struct { @@ -48,14 +50,15 @@ func isFile(fi os.FileInfo) bool { return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 } -func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, res chan<- interface{}) error { +func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- interface{}) error { info, err := os.Lstat(path) if err != nil { return err } if !info.IsDir() { - return fmt.Errorf("path is not a directory, cannot walk: %s", path) + jobs <- Entry{Info: info, Path: path, Result: res} + return nil } names, err := readDirNames(path) @@ -78,26 +81,95 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, } if isDir(fi) { - err = walk(subpath, done, entCh, dirCh, ch) + err = walk(subpath, done, jobs, ch) if err != nil { return err } } else { - entCh <- Entry{Info: fi, Path: subpath, Result: ch} + jobs <- Entry{Info: fi, Path: subpath, Result: ch} } } - dirCh <- Dir{Path: path, Info: info, Entries: entries, Result: res} + jobs <- Dir{Path: path, Info: info, Entries: entries, Result: res} return nil } -// Walk takes a path and sends a Job for each file and directory it finds below -// the path. When the channel done is closed, processing stops. -func Walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir) (<-chan interface{}, error) { +// Walk sends a Job for each file and directory it finds below the paths. When +// the channel done is closed, processing stops. +func Walk(paths []string, done chan struct{}, jobs chan<- interface{}) (<-chan interface{}, error) { resCh := make(chan interface{}, 1) - err := walk(path, done, entCh, dirCh, resCh) - close(entCh) - close(dirCh) - return resCh, err + defer func() { + close(resCh) + close(jobs) + debug.Log("pipe.Walk", "output channel closed") + }() + + entries := make([]<-chan interface{}, 0, len(paths)) + for _, path := range paths { + debug.Log("pipe.Walk", "start walker for %v", path) + ch := make(chan interface{}, 1) + entries = append(entries, ch) + err := walk(path, done, jobs, ch) + if err != nil { + return nil, err + } + debug.Log("pipe.Walk", "walker for %v done", path) + } + resCh <- Dir{Entries: entries} + return resCh, nil +} + +// Split feeds all elements read from inChan to dirChan and entChan. +func Split(inChan <-chan interface{}, dirChan chan<- Dir, entChan chan<- Entry) { + debug.Log("pipe.Split", "start") + defer debug.Log("pipe.Split", "done") + + inCh := inChan + dirCh := dirChan + entCh := entChan + + var ( + dir Dir + ent Entry + ) + + // deactivate sending until we received at least one job + dirCh = nil + entCh = nil + for { + select { + case job, ok := <-inCh: + if !ok { + // channel is closed + return + } + + if job == nil { + panic("nil job received") + } + + // disable receiving until the current job has been sent + inCh = nil + + switch j := job.(type) { + case Dir: + dir = j + dirCh = dirChan + case Entry: + ent = j + entCh = entChan + default: + panic(fmt.Sprintf("unknown job type %v", j)) + } + case dirCh <- dir: + // disable sending, re-enable receiving + dirCh = nil + inCh = inChan + case entCh <- ent: + // disable sending, re-enable receiving + entCh = nil + inCh = inChan + } + } } diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 8f71f107a..979fb85b2 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -43,9 +43,9 @@ func statPath(path string) (stats, error) { return s, err } -func TestPipelineWalker(t *testing.T) { +func TestPipelineWalkerWithSplit(t *testing.T) { if *testWalkerPath == "" { - t.Skipf("walkerpah not set, skipping TestPipelineWalker") + t.Skipf("walkerpath not set, skipping TestPipelineWalker") } before, err := statPath(*testWalkerPath) @@ -106,7 +106,92 @@ func TestPipelineWalker(t *testing.T) { go worker(&wg, done, entCh, dirCh) } - resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) + jobs := make(chan interface{}, 200) + wg.Add(1) + go func() { + pipe.Split(jobs, dirCh, entCh) + close(entCh) + close(dirCh) + wg.Done() + }() + + resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) + ok(t, err) + + // wait for all workers to terminate + wg.Wait() + + // wait for top-level blob + <-resCh + + t.Logf("walked path %s with %d dirs, %d files", *testWalkerPath, + after.dirs, after.files) + + assert(t, before == after, "stats do not match, expected %v, got %v", before, after) +} + +func TestPipelineWalker(t *testing.T) { + if *testWalkerPath == "" { + t.Skipf("walkerpath not set, skipping TestPipelineWalker") + } + + before, err := statPath(*testWalkerPath) + ok(t, err) + + t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath, + before.dirs, before.files) + + after := stats{} + m := sync.Mutex{} + + worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan interface{}) { + defer wg.Done() + for { + select { + case job, ok := <-jobs: + if !ok { + // channel is closed + return + } + assert(t, job != nil, "job is nil") + + switch j := job.(type) { + case pipe.Dir: + // wait for all content + for _, ch := range j.Entries { + <-ch + } + + m.Lock() + after.dirs++ + m.Unlock() + + j.Result <- true + case pipe.Entry: + m.Lock() + after.files++ + m.Unlock() + + j.Result <- true + } + + case <-done: + // pipeline was cancelled + return + } + } + } + + var wg sync.WaitGroup + done := make(chan struct{}) + jobs := make(chan interface{}) + + for i := 0; i < *maxWorkers; i++ { + wg.Add(1) + go worker(&wg, done, jobs) + } + + resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) ok(t, err) // wait for all workers to terminate @@ -123,7 +208,7 @@ func TestPipelineWalker(t *testing.T) { func BenchmarkPipelineWalker(b *testing.B) { if *testWalkerPath == "" { - b.Skipf("walkerpah not set, skipping BenchPipelineWalker") + b.Skipf("walkerpath not set, skipping BenchPipelineWalker") } var max time.Duration @@ -196,7 +281,16 @@ func BenchmarkPipelineWalker(b *testing.B) { go fileWorker(&wg, done, entCh) } - resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) + jobs := make(chan interface{}, 200) + wg.Add(1) + go func() { + pipe.Split(jobs, dirCh, entCh) + close(entCh) + close(dirCh) + wg.Done() + }() + + resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) ok(b, err) // wait for all workers to terminate diff --git a/restorer.go b/restorer.go index 5cb6e7f0b..28353d984 100644 --- a/restorer.go +++ b/restorer.go @@ -46,7 +46,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob Blob) error { dstpath := filepath.Join(dst, dir, node.Name) if res.Filter == nil || - res.Filter(filepath.Join(res.sn.Dir, dir, node.Name), dstpath, node) { + res.Filter(filepath.Join(dir, node.Name), dstpath, node) { err := CreateNodeAt(node, tree.Map, res.s, dstpath) // Did it fail because of ENOENT? diff --git a/snapshot.go b/snapshot.go index 87c9ae176..e1460343d 100644 --- a/snapshot.go +++ b/snapshot.go @@ -13,7 +13,7 @@ type Snapshot struct { Time time.Time `json:"time"` Parent backend.ID `json:"parent,omitempty"` Tree Blob `json:"tree"` - Dir string `json:"dir"` + Paths []string `json:"paths"` Hostname string `json:"hostname,omitempty"` Username string `json:"username,omitempty"` UID uint32 `json:"uid,omitempty"` @@ -24,15 +24,16 @@ type Snapshot struct { id backend.ID // plaintext ID, used during restore } -func NewSnapshot(dir string) (*Snapshot, error) { - d, err := filepath.Abs(dir) - if err != nil { - d = dir +func NewSnapshot(paths []string) (*Snapshot, error) { + for i, path := range paths { + if p, err := filepath.Abs(path); err != nil { + paths[i] = p + } } sn := &Snapshot{ - Dir: d, - Time: time.Now(), + Paths: paths, + Time: time.Now(), } hn, err := os.Hostname() @@ -59,7 +60,7 @@ func LoadSnapshot(s Server, id backend.ID) (*Snapshot, error) { } func (sn Snapshot) String() string { - return fmt.Sprintf("", sn.Dir, sn.Time) + return fmt.Sprintf("", sn.Paths, sn.Time) } func (sn Snapshot) ID() backend.ID { diff --git a/snapshot_test.go b/snapshot_test.go index 887c1a209..515b8bdbf 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -9,7 +9,7 @@ import ( func testSnapshot(t *testing.T, s restic.Server) { var err error - sn, err := restic.NewSnapshot("/home/foobar") + sn, err := restic.NewSnapshot([]string{"/home/foobar"}) ok(t, err) // sn.Tree, err = restic.Blob{ID: backend.ParseID("c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2")} // ok(t, err) diff --git a/walk.go b/walk.go new file mode 100644 index 000000000..d97ab09b6 --- /dev/null +++ b/walk.go @@ -0,0 +1,55 @@ +package restic + +import ( + "path/filepath" + + "github.com/restic/restic/backend" + "github.com/restic/restic/debug" +) + +type WalkTreeJob struct { + Path string + Error error + + Node *Node + Tree *Tree +} + +func walkTree(s Server, path string, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("walkTree", "start on %q (%v)", path, id.Str()) + // load tree + t, err := LoadTree(s, id) + if err != nil { + jobCh <- WalkTreeJob{Path: path, Error: err} + return + } + + for _, node := range t.Nodes { + p := filepath.Join(path, node.Name) + if node.Type == "dir" { + blob, err := t.Map.FindID(node.Subtree) + if err != nil { + jobCh <- WalkTreeJob{Path: p, Error: err} + continue + } + walkTree(s, p, blob.Storage, done, jobCh) + } else { + jobCh <- WalkTreeJob{Path: p, Node: node} + } + } + + if path != "" { + jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t} + } + debug.Log("walkTree", "done for %q (%v)", path, id.Str()) +} + +// WalkTree walks the tree specified by ID recursively and sends a job for each +// file and directory it finds. When the channel done is closed, processing +// stops. +func WalkTree(server Server, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("WalkTree", "start on %v", id.Str()) + walkTree(server, "", id, done, jobCh) + close(jobCh) + debug.Log("WalkTree", "done", id.Str()) +} diff --git a/walk_test.go b/walk_test.go new file mode 100644 index 000000000..0684793e4 --- /dev/null +++ b/walk_test.go @@ -0,0 +1,83 @@ +package restic_test + +import ( + "flag" + "path/filepath" + "testing" + + "github.com/restic/restic" + "github.com/restic/restic/pipe" +) + +var testWalkDirectory = flag.String("test.walkdir", ".", "test walking a directory (globbing pattern, default: .)") + +func TestWalkTree(t *testing.T) { + dirs, err := filepath.Glob(*testWalkDirectory) + ok(t, err) + + be := setupBackend(t) + defer teardownBackend(t, be) + key := setupKey(t, be, "geheim") + server := restic.NewServerWithKey(be, key) + + // archive a few files + arch, err := restic.NewArchiver(server) + ok(t, err) + sn, _, err := arch.Snapshot(nil, dirs, nil) + ok(t, err) + + // start benchmark + // t.ResetTimer() + + // for i := 0; i < t.N; i++ { + + done := make(chan struct{}) + + // start tree walker + treeJobs := make(chan restic.WalkTreeJob) + go restic.WalkTree(server, sn.Tree.Storage, done, treeJobs) + + // start filesystem walker + fsJobs := make(chan interface{}) + go pipe.Walk(dirs, done, fsJobs) + + for { + // receive fs job + fsJob, fsChOpen := <-fsJobs + assert(t, !fsChOpen || fsJob != nil, + "received nil job from filesystem: %v %v", fsJob, fsChOpen) + + var path string + fsEntries := 1 + switch j := fsJob.(type) { + case pipe.Dir: + path = j.Path + fsEntries = len(j.Entries) + case pipe.Entry: + path = j.Path + } + + // receive tree job + treeJob, treeChOpen := <-treeJobs + treeEntries := 1 + + if treeJob.Tree != nil { + treeEntries = len(treeJob.Tree.Nodes) + } + + assert(t, fsChOpen == treeChOpen, + "one channel closed too early: fsChOpen %v, treeChOpen %v", + fsChOpen, treeChOpen) + + if !fsChOpen || !treeChOpen { + break + } + + assert(t, filepath.Base(path) == filepath.Base(treeJob.Path), + "paths do not match: %q != %q", filepath.Base(path), filepath.Base(treeJob.Path)) + + assert(t, fsEntries == treeEntries, + "wrong number of entries: %v != %v", fsEntries, treeEntries) + } + // } +}