From bd105b69a8c5cef65d1fc1ca4255c94a0758186b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 15:30:08 +0100 Subject: [PATCH 01/12] Add debug() function to khepri cmd and lib --- cmd/khepri/debug.go | 2 +- debug.go | 38 ++++++++++++++++++++++++++++++++++++++ debug_release.go | 5 +++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 debug.go create mode 100644 debug_release.go diff --git a/cmd/khepri/debug.go b/cmd/khepri/debug.go index 687f0f162..bc60b8dad 100644 --- a/cmd/khepri/debug.go +++ b/cmd/khepri/debug.go @@ -26,7 +26,7 @@ func initDebugLogger() *log.Logger { // open logger l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) - fmt.Fprintf(os.Stderr, "logging activated, writing log file %s", filename) + fmt.Fprintf(os.Stderr, "logging activated, writing log file %s\n", filename) l.Printf("khepri %s", version) return l diff --git a/debug.go b/debug.go new file mode 100644 index 000000000..cd4141dc3 --- /dev/null +++ b/debug.go @@ -0,0 +1,38 @@ +// +build debug + +package khepri + +import ( + "fmt" + "io" + "log" + "os" + "path/filepath" + "time" +) + +var version = "compiled manually" +var debugLogger = initDebugLogger() + +func initDebugLogger() *log.Logger { + // create new log file + filename := fmt.Sprintf("khepri-debug-%d-%s", + os.Getpid(), time.Now().Format("20060201-150405")) + f, err := os.OpenFile(filepath.Join(os.TempDir(), filename), + os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) + os.Exit(2) + } + + // open logger + l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) + fmt.Fprintf(os.Stderr, "logging activated, writing log file %s\n", filename) + l.Printf("khepri %s", version) + + return l +} + +func debug(fmt string, args ...interface{}) { + debugLogger.Printf(fmt, args...) +} diff --git a/debug_release.go b/debug_release.go new file mode 100644 index 000000000..21f2b24a5 --- /dev/null +++ b/debug_release.go @@ -0,0 +1,5 @@ +// +build !debug + +package khepri + +func debug(fmt string, args ...interface{}) {} From 7006e13ca963de9921bbab0b36c1d99554e220d6 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 15:30:37 +0100 Subject: [PATCH 02/12] Add dependencies --- cmd/khepri/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/khepri/Makefile b/cmd/khepri/Makefile index dab9cbb16..cad3f7917 100644 --- a/cmd/khepri/Makefile +++ b/cmd/khepri/Makefile @@ -8,7 +8,7 @@ TAGS = all: khepri -khepri: *.go +khepri: *.go $(wildcard ../../*.go) $(wildcard ../../*/*.go) go build $(TAGS) -ldflags "$(LDFLAGS)" debug: TAGS=-tags debug From cf33b015823433095f981e6ae711fc6597f95ee7 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 15:40:04 +0100 Subject: [PATCH 03/12] Load config.mk in Makefile --- cmd/khepri/.gitignore | 1 + cmd/khepri/Makefile | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 cmd/khepri/.gitignore diff --git a/cmd/khepri/.gitignore b/cmd/khepri/.gitignore new file mode 100644 index 000000000..aee2e4ce1 --- /dev/null +++ b/cmd/khepri/.gitignore @@ -0,0 +1 @@ +config.mk diff --git a/cmd/khepri/Makefile b/cmd/khepri/Makefile index cad3f7917..66819e525 100644 --- a/cmd/khepri/Makefile +++ b/cmd/khepri/Makefile @@ -6,12 +6,15 @@ TAGS = .PHONY: all clean debug +# include config file if it exists +-include $(CURDIR)/config.mk + all: khepri khepri: *.go $(wildcard ../../*.go) $(wildcard ../../*/*.go) go build $(TAGS) -ldflags "$(LDFLAGS)" -debug: TAGS=-tags debug +debug: TAGS=-tags debug_cmd debug: khepri clean: From 616a2c749dcf495f1497bdc508160cc6aace69e8 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 15:40:28 +0100 Subject: [PATCH 04/12] Use different tags for debug log --- cmd/khepri/debug.go | 8 ++++---- cmd/khepri/debug_release.go | 2 +- debug.go | 10 ++++------ 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/cmd/khepri/debug.go b/cmd/khepri/debug.go index bc60b8dad..dc978a51a 100644 --- a/cmd/khepri/debug.go +++ b/cmd/khepri/debug.go @@ -1,4 +1,4 @@ -// +build debug +// +build debug_cmd package main @@ -17,8 +17,8 @@ func initDebugLogger() *log.Logger { // create new log file filename := fmt.Sprintf("khepri-debug-%d-%s", os.Getpid(), time.Now().Format("20060201-150405")) - f, err := os.OpenFile(filepath.Join(os.TempDir(), filename), - os.O_WRONLY|os.O_CREATE, 0600) + path := filepath.Join(os.TempDir(), filename) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) os.Exit(2) @@ -26,7 +26,7 @@ func initDebugLogger() *log.Logger { // open logger l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) - fmt.Fprintf(os.Stderr, "logging activated, writing log file %s\n", filename) + fmt.Fprintf(os.Stderr, "debug log for khepri command activated, writing log file %s\n", path) l.Printf("khepri %s", version) return l diff --git a/cmd/khepri/debug_release.go b/cmd/khepri/debug_release.go index 77f5c1b76..ef42f0638 100644 --- a/cmd/khepri/debug_release.go +++ b/cmd/khepri/debug_release.go @@ -1,4 +1,4 @@ -// +build !debug +// +build !debug_cmd package main diff --git a/debug.go b/debug.go index cd4141dc3..80952b3e7 100644 --- a/debug.go +++ b/debug.go @@ -11,15 +11,14 @@ import ( "time" ) -var version = "compiled manually" var debugLogger = initDebugLogger() func initDebugLogger() *log.Logger { // create new log file - filename := fmt.Sprintf("khepri-debug-%d-%s", + filename := fmt.Sprintf("khepri-lib-debug-%d-%s", os.Getpid(), time.Now().Format("20060201-150405")) - f, err := os.OpenFile(filepath.Join(os.TempDir(), filename), - os.O_WRONLY|os.O_CREATE, 0600) + path := filepath.Join(os.TempDir(), filename) + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { fmt.Fprintf(os.Stderr, "unable to create debug log file: %v", err) os.Exit(2) @@ -27,8 +26,7 @@ func initDebugLogger() *log.Logger { // open logger l := log.New(io.MultiWriter(os.Stderr, f), "DEBUG: ", log.LstdFlags) - fmt.Fprintf(os.Stderr, "logging activated, writing log file %s\n", filename) - l.Printf("khepri %s", version) + fmt.Fprintf(os.Stderr, "debug log for khepri library activated, writing log file %s\n", path) return l } From 4a3a6861e20bbf3eed9913a4ab84eec73648b3dc Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 21:29:11 +0100 Subject: [PATCH 05/12] Scan directory first, then backup. Add stats. --- archiver.go | 156 ++++++++++++++++++++++++--------------- cmd/khepri/cmd_backup.go | 71 +++++++++++++++++- tree.go | 2 + 3 files changed, 168 insertions(+), 61 deletions(-) diff --git a/archiver.go b/archiver.go index 15dbb3f43..acc2b9a1f 100644 --- a/archiver.go +++ b/archiver.go @@ -13,8 +13,20 @@ type Archiver struct { ch *ContentHandler smap *StorageMap // blobs used for the current snapshot + Stats Stats + Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool + + ScannerUpdate func(stats Stats) + SaveUpdate func(stats Stats) +} + +type Stats struct { + Files int + Directories int + Other int + Bytes uint64 } func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { @@ -25,6 +37,9 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files arch.Filter = func(string, os.FileInfo) bool { return true } + // do nothing + arch.ScannerUpdate = func(Stats) {} + arch.SaveUpdate = func(Stats) {} arch.smap = NewStorageMap() arch.ch, err = NewContentHandler(be, key) @@ -65,10 +80,10 @@ func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) return blob, nil } -func (arch *Archiver) SaveFile(node *Node) (Blobs, error) { +func (arch *Archiver) SaveFile(node *Node) error { blobs, err := arch.ch.SaveFile(node.path, uint(node.Size)) if err != nil { - return nil, arch.Error(node.path, nil, err) + return arch.Error(node.path, nil, err) } node.Content = make([]backend.ID, len(blobs)) @@ -77,23 +92,20 @@ func (arch *Archiver) SaveFile(node *Node) (Blobs, error) { arch.smap.Insert(blob) } - return blobs, err + return err } -func (arch *Archiver) ImportDir(dir string) (Tree, error) { +func (arch *Archiver) loadTree(dir string) (*Tree, error) { + // open and list path fd, err := os.Open(dir) defer fd.Close() if err != nil { - return nil, arch.Error(dir, nil, err) + return nil, err } entries, err := fd.Readdir(-1) if err != nil { - return nil, arch.Error(dir, nil, err) - } - - if len(entries) == 0 { - return nil, nil + return nil, err } tree := Tree{} @@ -107,71 +119,97 @@ func (arch *Archiver) ImportDir(dir string) (Tree, error) { node, err := NodeFromFileInfo(path, entry) if err != nil { - return nil, arch.Error(dir, entry, err) + // TODO: error processing + return nil, err } tree = append(tree, node) if entry.IsDir() { - subtree, err := arch.ImportDir(path) + node.Tree, err = arch.loadTree(path) if err != nil { return nil, err } - - blob, err := arch.SaveJSON(backend.Tree, subtree) - if err != nil { - return nil, err - } - - node.Subtree = blob.ID - - continue } - if node.Type == "file" { - _, err := arch.SaveFile(node) - if err != nil { - return nil, arch.Error(path, entry, err) - } + switch node.Type { + case "file": + arch.Stats.Files++ + arch.Stats.Bytes += node.Size + case "dir": + arch.Stats.Directories++ + default: + arch.Stats.Other++ } } - return tree, nil + arch.ScannerUpdate(arch.Stats) + + return &tree, nil } -func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) { +func (arch *Archiver) LoadTree(path string) (*Tree, error) { + fi, err := os.Lstat(path) + if err != nil { + return nil, err + } + + node, err := NodeFromFileInfo(path, fi) + if err != nil { + return nil, err + } + + if node.Type != "dir" { + arch.Stats.Files = 1 + arch.Stats.Bytes = node.Size + arch.ScannerUpdate(arch.Stats) + return &Tree{node}, nil + } + + arch.Stats.Directories = 1 + node.Tree, err = arch.loadTree(path) + if err != nil { + return nil, err + } + + arch.ScannerUpdate(arch.Stats) + + return &Tree{node}, nil +} + +func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { + for _, node := range *t { + if node.Tree != nil && node.Subtree == nil { + b, err := arch.saveTree(node.Tree) + if err != nil { + return nil, err + } + node.Subtree = b.ID + arch.SaveUpdate(Stats{Directories: 1}) + } else if node.Type == "file" && len(node.Content) == 0 { + err := arch.SaveFile(node) + if err != nil { + return nil, err + } + + arch.SaveUpdate(Stats{Files: 1, Bytes: node.Size}) + } else { + arch.SaveUpdate(Stats{Other: 1}) + } + } + + blob, err := arch.SaveJSON(backend.Tree, t) + if err != nil { + return nil, err + } + + return blob, nil +} + +func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { sn := NewSnapshot(dir) - fi, err := os.Lstat(dir) - if err != nil { - return nil, nil, err - } - - node, err := NodeFromFileInfo(dir, fi) - if err != nil { - return nil, nil, err - } - - if node.Type == "dir" { - tree, err := arch.ImportDir(dir) - if err != nil { - return nil, nil, err - } - - blob, err := arch.SaveJSON(backend.Tree, tree) - if err != nil { - return nil, nil, err - } - - node.Subtree = blob.ID - } else if node.Type == "file" { - _, err := arch.SaveFile(node) - if err != nil { - return nil, nil, err - } - } - - blob, err := arch.SaveJSON(backend.Tree, &Tree{node}) + blob, err := arch.saveTree(t) if err != nil { return nil, nil, err } @@ -185,5 +223,5 @@ func (arch *Archiver) Import(dir string) (*Snapshot, *Blob, error) { return nil, nil, err } - return sn, blob, nil + return sn, blob.Storage, nil } diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index 6fdc6c761..da2fef8ab 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -4,11 +4,41 @@ import ( "errors" "fmt" "os" + "strings" "github.com/fd0/khepri" "github.com/fd0/khepri/backend" + "golang.org/x/crypto/ssh/terminal" ) +func format_bytes(c uint64) string { + b := float64(c) + + switch { + case c > 1<<40: + return fmt.Sprintf("%.3f TiB", b/(1<<40)) + case c > 1<<30: + return fmt.Sprintf("%.3f GiB", b/(1<<30)) + case c > 1<<20: + return fmt.Sprintf("%.3f MiB", b/(1<<20)) + case c > 1<<10: + return fmt.Sprintf("%.3f KiB", b/(1<<10)) + default: + return fmt.Sprintf("%d B", c) + } +} + +func print_tree2(indent int, t *khepri.Tree) { + for _, node := range *t { + if node.Tree != nil { + fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name) + print_tree2(indent+1, node.Tree) + } else { + fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name) + } + } +} + func commandBackup(be backend.Server, key *khepri.Key, args []string) error { if len(args) != 1 { return errors.New("usage: backup [dir|file]") @@ -25,12 +55,49 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { return err } - _, blob, err := arch.Import(target) + fmt.Printf("scanning %s\n", target) + + if terminal.IsTerminal(int(os.Stdout.Fd())) { + arch.ScannerUpdate = func(stats khepri.Stats) { + fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) + } + } + + // TODO: add filter + // arch.Filter = func(dir string, fi os.FileInfo) bool { + // return true + // } + + t, err := arch.LoadTree(target) if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) return err } - fmt.Printf("snapshot %s saved\n", blob.Storage) + fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) + + stats := khepri.Stats{} + if terminal.IsTerminal(int(os.Stdout.Fd())) { + arch.SaveUpdate = func(s khepri.Stats) { + stats.Files += s.Files + stats.Directories += s.Directories + stats.Other += s.Other + stats.Bytes += s.Bytes + + fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", + float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, + stats.Directories, arch.Stats.Directories, + stats.Files, arch.Stats.Files, + format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) + } + } + + sn, id, err := arch.Snapshot(target, t) + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + } + + fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) return nil } diff --git a/tree.go b/tree.go index ad5f64597..778388122 100644 --- a/tree.go +++ b/tree.go @@ -34,6 +34,8 @@ type Node struct { Content []backend.ID `json:"content,omitempty"` Subtree backend.ID `json:"subtree,omitempty"` + Tree *Tree `json:"-"` + path string } From 1ac4f92299385c7532d778846d53c195840007e8 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 21:41:05 +0100 Subject: [PATCH 06/12] Add benchmark for Chunk+Encrypt --- archiver_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 archiver_test.go diff --git a/archiver_test.go b/archiver_test.go new file mode 100644 index 000000000..529b2bb5f --- /dev/null +++ b/archiver_test.go @@ -0,0 +1,53 @@ +package khepri_test + +import ( + "bytes" + "io" + "math/rand" + "testing" + + "github.com/fd0/khepri/chunker" +) + +func get_random(seed, count int) []byte { + buf := make([]byte, count) + + rnd := rand.New(rand.NewSource(23)) + for i := 0; i < count; i += 4 { + r := rnd.Uint32() + buf[i] = byte(r) + buf[i+1] = byte(r >> 8) + buf[i+2] = byte(r >> 16) + buf[i+3] = byte(r >> 24) + } + + return buf +} + +func BenchmarkChunkEncrypt(b *testing.B) { + data := get_random(23, 10<<20) // 10MiB + + be := setupBackend(b) + defer teardownBackend(b, be) + key := setupKey(b, be, "geheim") + + b.ResetTimer() + b.SetBytes(int64(len(data))) + + for i := 0; i < b.N; i++ { + ch := chunker.New(bytes.NewReader(data)) + + for { + chunk_data, err := ch.Next() + + if err == io.EOF { + break + } + + ok(b, err) + + _, err = key.Encrypt(chunk_data.Data) + ok(b, err) + } + } +} From 94d1482888f73d43d0cec40eeedbc45cc95dbeec Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 16 Nov 2014 22:50:20 +0100 Subject: [PATCH 07/12] Save multiple files in parallel --- archiver.go | 74 +++++++++++++++++++++++++++++++++------- cmd/khepri/cmd_backup.go | 4 +++ cmd/khepri/main.go | 4 +++ contenthandler.go | 11 ++++++ 4 files changed, 81 insertions(+), 12 deletions(-) diff --git a/archiver.go b/archiver.go index acc2b9a1f..028e94162 100644 --- a/archiver.go +++ b/archiver.go @@ -3,16 +3,25 @@ package khepri import ( "os" "path/filepath" + "sync" "github.com/fd0/khepri/backend" ) +const ( + maxConcurrentFiles = 32 +) + type Archiver struct { - be backend.Server - key *Key - ch *ContentHandler + be backend.Server + key *Key + ch *ContentHandler + + m sync.Mutex smap *StorageMap // blobs used for the current snapshot + fileToken chan struct{} + Stats Stats Error func(dir string, fi os.FileInfo, err error) error @@ -20,6 +29,8 @@ type Archiver struct { ScannerUpdate func(stats Stats) SaveUpdate func(stats Stats) + + sum sync.Mutex // for SaveUpdate } type Stats struct { @@ -31,7 +42,16 @@ type Stats struct { func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { var err error - arch := &Archiver{be: be, key: key} + arch := &Archiver{ + be: be, + key: key, + fileToken: make(chan struct{}, maxConcurrentFiles), + } + + // fill file token + for i := 0; i < maxConcurrentFiles; i++ { + arch.fileToken <- struct{}{} + } // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } @@ -39,7 +59,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { arch.Filter = func(string, os.FileInfo) bool { return true } // do nothing arch.ScannerUpdate = func(Stats) {} - arch.SaveUpdate = func(Stats) {} arch.smap = NewStorageMap() arch.ch, err = NewContentHandler(be, key) @@ -56,6 +75,14 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { return arch, nil } +func (arch *Archiver) saveUpdate(stats Stats) { + if arch.SaveUpdate != nil { + arch.sum.Lock() + defer arch.sum.Unlock() + arch.SaveUpdate(stats) + } +} + func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { @@ -63,6 +90,8 @@ func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { } // store blob in storage map for current snapshot + arch.m.Lock() + defer arch.m.Unlock() arch.smap.Insert(blob) return blob, nil @@ -75,6 +104,8 @@ func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) } // store blob in storage map for current snapshot + arch.m.Lock() + defer arch.m.Unlock() arch.smap.Insert(blob) return blob, nil @@ -89,7 +120,9 @@ func (arch *Archiver) SaveFile(node *Node) error { node.Content = make([]backend.ID, len(blobs)) for i, blob := range blobs { node.Content[i] = blob.ID + arch.m.Lock() arch.smap.Insert(blob) + arch.m.Unlock() } return err @@ -178,6 +211,8 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { } func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { + var wg sync.WaitGroup + for _, node := range *t { if node.Tree != nil && node.Subtree == nil { b, err := arch.saveTree(node.Tree) @@ -185,19 +220,34 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { return nil, err } node.Subtree = b.ID - arch.SaveUpdate(Stats{Directories: 1}) + arch.saveUpdate(Stats{Directories: 1}) } else if node.Type == "file" && len(node.Content) == 0 { - err := arch.SaveFile(node) - if err != nil { - return nil, err - } + // start goroutine + wg.Add(1) + go func(n *Node) { + defer wg.Done() - arch.SaveUpdate(Stats{Files: 1, Bytes: node.Size}) + // get token + token := <-arch.fileToken + defer func() { + arch.fileToken <- token + }() + + // debug("start: %s", n.path) + + // TODO: handle error + arch.SaveFile(n) + arch.saveUpdate(Stats{Files: 1, Bytes: n.Size}) + + // debug("done: %s", n.path) + }(node) } else { - arch.SaveUpdate(Stats{Other: 1}) + arch.saveUpdate(Stats{Other: 1}) } } + wg.Wait() + blob, err := arch.SaveJSON(backend.Tree, t) if err != nil { return nil, err diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index da2fef8ab..0e3972186 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/fd0/khepri" "github.com/fd0/khepri/backend" @@ -92,12 +93,15 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { } } + start := time.Now() sn, id, err := arch.Snapshot(target, t) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) } fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) + duration := time.Now().Sub(start) + fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) return nil } diff --git a/cmd/khepri/main.go b/cmd/khepri/main.go index 719a84ee8..cd1c0a777 100644 --- a/cmd/khepri/main.go +++ b/cmd/khepri/main.go @@ -5,6 +5,7 @@ import ( "log" "net/url" "os" + "runtime" "sort" "strings" @@ -128,6 +129,9 @@ func init() { commands["snapshots"] = commandSnapshots commands["cat"] = commandCat commands["ls"] = commandLs + + // set GOMAXPROCS to number of CPUs + runtime.GOMAXPROCS(runtime.NumCPU()) } func main() { diff --git a/contenthandler.go b/contenthandler.go index 271df9b76..514bc0fe3 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "os" + "sync" "github.com/fd0/khepri/backend" "github.com/fd0/khepri/chunker" @@ -15,6 +16,7 @@ type ContentHandler struct { be backend.Server key *Key + m sync.Mutex content *StorageMap } @@ -36,6 +38,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) { return nil, err } + ch.m.Lock() + defer ch.m.Unlock() ch.content.Merge(sn.StorageMap) return sn, nil } @@ -49,6 +53,9 @@ func (ch *ContentHandler) LoadAllSnapshots() error { if err != nil { return } + + ch.m.Lock() + defer ch.m.Unlock() ch.content.Merge(sn.StorageMap) }) if err != nil { @@ -65,6 +72,8 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { id := backend.Hash(data) // test if the hash is already in the backend + ch.m.Lock() + defer ch.m.Unlock() blob := ch.content.Find(id) if blob != nil { return blob, nil @@ -177,6 +186,8 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { } // lookup storage hash + ch.m.Lock() + defer ch.m.Unlock() blob := ch.content.Find(id) if blob == nil { return nil, errors.New("Storage ID not found") From fe92062735f6885507ddfbbb06c77fe7d2ad1497 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 17 Nov 2014 23:28:51 +0100 Subject: [PATCH 08/12] Move SaveFile to Archiver, add blobs status --- archiver.go | 58 +++++++++++++++++++++++++++++++++++++++++------ contenthandler.go | 53 ------------------------------------------- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/archiver.go b/archiver.go index 028e94162..182ce72cb 100644 --- a/archiver.go +++ b/archiver.go @@ -1,11 +1,14 @@ package khepri import ( + "io" + "io/ioutil" "os" "path/filepath" "sync" "github.com/fd0/khepri/backend" + "github.com/fd0/khepri/chunker" ) const ( @@ -111,10 +114,55 @@ func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) return blob, nil } +// SaveFile stores the content of the file on the backend as a Blob by calling +// Save for each chunk. func (arch *Archiver) SaveFile(node *Node) error { - blobs, err := arch.ch.SaveFile(node.path, uint(node.Size)) + file, err := os.Open(node.path) + defer file.Close() if err != nil { - return arch.Error(node.path, nil, err) + return err + } + + var blobs Blobs + + // if the file is small enough, store it directly + if node.Size < chunker.MinSize { + buf, err := ioutil.ReadAll(file) + if err != nil { + return err + } + + blob, err := arch.ch.Save(backend.Data, buf) + if err != nil { + return err + } + + arch.saveUpdate(Stats{Bytes: blob.Size}) + + blobs = Blobs{blob} + } else { + // else store all chunks + chunker := chunker.New(file) + + for { + chunk, err := chunker.Next() + if err == io.EOF { + break + } + + if err != nil { + return err + } + + blob, err := arch.ch.Save(backend.Data, chunk.Data) + if err != nil { + return err + } + + arch.saveUpdate(Stats{Bytes: blob.Size}) + + blobs = append(blobs, blob) + } } node.Content = make([]backend.ID, len(blobs)) @@ -233,13 +281,9 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { arch.fileToken <- token }() - // debug("start: %s", n.path) - // TODO: handle error arch.SaveFile(n) - arch.saveUpdate(Stats{Files: 1, Bytes: n.Size}) - - // debug("done: %s", n.path) + arch.saveUpdate(Stats{Files: 1}) }(node) } else { arch.saveUpdate(Stats{Other: 1}) diff --git a/contenthandler.go b/contenthandler.go index 514bc0fe3..eaf9b8bd5 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -3,13 +3,9 @@ package khepri import ( "encoding/json" "errors" - "io" - "io/ioutil" - "os" "sync" "github.com/fd0/khepri/backend" - "github.com/fd0/khepri/chunker" ) type ContentHandler struct { @@ -118,55 +114,6 @@ func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, err return ch.Save(t, backend.Compress(data)) } -// SaveFile stores the content of the file on the backend as a Blob by calling -// Save for each chunk. -func (ch *ContentHandler) SaveFile(filename string, size uint) (Blobs, error) { - file, err := os.Open(filename) - defer file.Close() - if err != nil { - return nil, err - } - - // if the file is small enough, store it directly - if size < chunker.MinSize { - buf, err := ioutil.ReadAll(file) - if err != nil { - return nil, err - } - - blob, err := ch.Save(backend.Data, buf) - if err != nil { - return nil, err - } - - return Blobs{blob}, nil - } - - // else store all chunks - blobs := Blobs{} - chunker := chunker.New(file) - - for { - chunk, err := chunker.Next() - if err == io.EOF { - break - } - - if err != nil { - return nil, err - } - - blob, err := ch.Save(backend.Data, chunk.Data) - if err != nil { - return nil, err - } - - blobs = append(blobs, blob) - } - - return blobs, nil -} - // Load tries to load and decrypt content identified by t and id from the backend. func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { if t == backend.Snapshot { From 551c7525fe721a1b658bc8f15916d829b3c8be9b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 17 Nov 2014 23:37:03 +0100 Subject: [PATCH 09/12] Rename chunker so package chunker does not overlap --- archiver.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/archiver.go b/archiver.go index 182ce72cb..4d4ac07f4 100644 --- a/archiver.go +++ b/archiver.go @@ -142,10 +142,10 @@ func (arch *Archiver) SaveFile(node *Node) error { blobs = Blobs{blob} } else { // else store all chunks - chunker := chunker.New(file) + chnker := chunker.New(file) for { - chunk, err := chunker.Next() + chunk, err := chnker.Next() if err == io.EOF { break } From d594cd89b7670eae9b14d37aaf330e9429fab08f Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Wed, 19 Nov 2014 22:56:52 +0100 Subject: [PATCH 10/12] Chunker: remove unneeded return --- chunker/chunker.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/chunker/chunker.go b/chunker/chunker.go index c317672c0..2cd5a4e34 100644 --- a/chunker/chunker.go +++ b/chunker/chunker.go @@ -218,8 +218,6 @@ func (c *chunker) Next() (*Chunk, error) { c.pos += steps c.bpos = c.bmax } - - return nil, nil } func (c *chunker) append(b byte) { From d1e4431514a66eb802d974b9bdd9ec0f91e239aa Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 21 Nov 2014 21:21:44 +0100 Subject: [PATCH 11/12] Refactor StorageMap to BlobList --- Makefile | 1 - archiver.go | 33 +++++------ backend/id.go | 5 ++ bloblist.go | 99 +++++++++++++++++++++++++++++++++ bloblist_test.go | 137 ++++++++++++++++++++++++++++++++++++++++++++++ contenthandler.go | 48 +++++++--------- restorer.go | 4 +- snapshot.go | 16 +++--- storagemap.go | 51 ----------------- tree.go | 2 +- 10 files changed, 285 insertions(+), 111 deletions(-) create mode 100644 bloblist.go create mode 100644 bloblist_test.go delete mode 100644 storagemap.go diff --git a/Makefile b/Makefile index be9c4d90b..c58519465 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ .PHONY: clean all test test: - go test -race ./... for dir in cmd/* ; do \ (cd "$$dir"; go build -race) \ done diff --git a/archiver.go b/archiver.go index 4d4ac07f4..226ea5ecc 100644 --- a/archiver.go +++ b/archiver.go @@ -20,8 +20,7 @@ type Archiver struct { key *Key ch *ContentHandler - m sync.Mutex - smap *StorageMap // blobs used for the current snapshot + bl *BlobList // blobs used for the current snapshot fileToken chan struct{} @@ -63,7 +62,7 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { // do nothing arch.ScannerUpdate = func(Stats) {} - arch.smap = NewStorageMap() + arch.bl = NewBlobList() arch.ch, err = NewContentHandler(be, key) if err != nil { return nil, err @@ -86,30 +85,26 @@ func (arch *Archiver) saveUpdate(stats Stats) { } } -func (arch *Archiver) Save(t backend.Type, data []byte) (*Blob, error) { +func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { blob, err := arch.ch.Save(t, data) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.m.Lock() - defer arch.m.Unlock() - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } -func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (arch *Archiver) SaveJSON(t backend.Type, item interface{}) (Blob, error) { blob, err := arch.ch.SaveJSON(t, item) if err != nil { - return nil, err + return Blob{}, err } // store blob in storage map for current snapshot - arch.m.Lock() - defer arch.m.Unlock() - arch.smap.Insert(blob) + arch.bl.Insert(blob) return blob, nil } @@ -168,9 +163,7 @@ func (arch *Archiver) SaveFile(node *Node) error { node.Content = make([]backend.ID, len(blobs)) for i, blob := range blobs { node.Content[i] = blob.ID - arch.m.Lock() - arch.smap.Insert(blob) - arch.m.Unlock() + arch.bl.Insert(blob) } return err @@ -258,14 +251,14 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { return &Tree{node}, nil } -func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { +func (arch *Archiver) saveTree(t *Tree) (Blob, error) { var wg sync.WaitGroup for _, node := range *t { if node.Tree != nil && node.Subtree == nil { b, err := arch.saveTree(node.Tree) if err != nil { - return nil, err + return Blob{}, err } node.Subtree = b.ID arch.saveUpdate(Stats{Directories: 1}) @@ -294,7 +287,7 @@ func (arch *Archiver) saveTree(t *Tree) (*Blob, error) { blob, err := arch.SaveJSON(backend.Tree, t) if err != nil { - return nil, err + return Blob{}, err } return blob, nil @@ -311,7 +304,7 @@ func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, erro sn.Content = blob.ID // save snapshot - sn.StorageMap = arch.smap + sn.BlobList = arch.bl blob, err = arch.SaveJSON(backend.Snapshot, sn) if err != nil { return nil, nil, err diff --git a/backend/id.go b/backend/id.go index ef9067040..0c15cad26 100644 --- a/backend/id.go +++ b/backend/id.go @@ -47,6 +47,11 @@ func (id ID) EqualString(other string) (bool, error) { return id.Equal(ID(s)), nil } +// Compare compares this ID to another one, returning -1, 0, or 1. +func (id ID) Compare(other ID) int { + return bytes.Compare(other, id) +} + func (id ID) MarshalJSON() ([]byte, error) { return json.Marshal(id.String()) } diff --git a/bloblist.go b/bloblist.go new file mode 100644 index 000000000..48820e95f --- /dev/null +++ b/bloblist.go @@ -0,0 +1,99 @@ +package khepri + +import ( + "bytes" + "encoding/json" + "errors" + "sort" + "sync" +) + +type BlobList struct { + list []Blob + m sync.Mutex +} + +var ErrBlobNotFound = errors.New("Blob not found") + +func NewBlobList() *BlobList { + return &BlobList{ + list: []Blob{}, + } +} + +func (bl *BlobList) find(blob Blob) (int, Blob, error) { + pos := sort.Search(len(bl.list), func(i int) bool { + return blob.ID.Compare(bl.list[i].ID) >= 0 + }) + + if pos < len(bl.list) && blob.ID.Compare(bl.list[pos].ID) == 0 { + return pos, bl.list[pos], nil + } + + return pos, Blob{}, ErrBlobNotFound +} + +func (bl *BlobList) Find(blob Blob) (Blob, error) { + bl.m.Lock() + defer bl.m.Unlock() + + _, blob, err := bl.find(blob) + return blob, err +} + +func (bl *BlobList) Merge(other *BlobList) { + bl.m.Lock() + defer bl.m.Unlock() + other.m.Lock() + defer other.m.Unlock() + + for _, blob := range other.list { + bl.insert(blob) + } +} + +func (bl *BlobList) insert(blob Blob) { + pos, _, err := bl.find(blob) + if err == nil { + // already present + return + } + + // insert blob + // https://code.google.com/p/go-wiki/wiki/bliceTricks + bl.list = append(bl.list, Blob{}) + copy(bl.list[pos+1:], bl.list[pos:]) + bl.list[pos] = blob +} + +func (bl *BlobList) Insert(blob Blob) { + bl.m.Lock() + defer bl.m.Unlock() + + bl.insert(blob) +} + +func (bl BlobList) MarshalJSON() ([]byte, error) { + return json.Marshal(bl.list) +} + +func (bl *BlobList) UnmarshalJSON(data []byte) error { + return json.Unmarshal(data, &bl.list) +} + +// Compare compares two blobs by comparing the ID and the size. It returns -1, +// 0, or 1. +func (blob Blob) Compare(other Blob) int { + if res := bytes.Compare(other.ID, blob.ID); res != 0 { + return res + } + + if blob.Size < other.Size { + return -1 + } + if blob.Size > other.Size { + return 1 + } + + return 0 +} diff --git a/bloblist_test.go b/bloblist_test.go new file mode 100644 index 000000000..3add85cf9 --- /dev/null +++ b/bloblist_test.go @@ -0,0 +1,137 @@ +package khepri_test + +import ( + "crypto/rand" + "encoding/json" + "flag" + "io" + mrand "math/rand" + "sync" + "testing" + "time" + + "github.com/fd0/khepri" +) + +const backendIDSize = 8 + +var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against") + +func randomID() []byte { + buf := make([]byte, backendIDSize) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + panic(err) + } + return buf +} + +func newBlob() khepri.Blob { + return khepri.Blob{ID: randomID(), Size: uint64(mrand.Uint32())} +} + +// Test basic functionality +func TestBlobList(t *testing.T) { + bl := khepri.NewBlobList() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err := bl.Find(khepri.Blob{ID: b.ID}) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + bl2 := khepri.NewBlobList() + for i := 0; i < 1000; i++ { + bl.Insert(newBlob()) + } + + b2, err = bl2.Find(b) + assert(t, err != nil, "found ID in khepri that was never inserted: %v", b2) + + bl2.Merge(bl) + + b2, err = bl2.Find(b) + + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } +} + +// Test JSON encode/decode +func TestBlobListJSON(t *testing.T) { + bl := khepri.NewBlobList() + b := khepri.Blob{ID: []byte{1, 2, 3, 4}} + bl.Insert(b) + + b2, err := bl.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err := json.Marshal(bl) + ok(t, err) + + bl2 := khepri.BlobList{} + json.Unmarshal(buf, &bl2) + + b2, err = bl2.Find(b) + ok(t, err) + assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) + + buf, err = json.Marshal(bl2) + ok(t, err) +} + +// random insert/find access by several goroutines +func TestBlobListRandom(t *testing.T) { + var wg sync.WaitGroup + + worker := func(bl *khepri.BlobList) { + defer wg.Done() + + b := newBlob() + bl.Insert(b) + + for i := 0; i < 200; i++ { + bl.Insert(newBlob()) + } + + d := time.Duration(mrand.Intn(10)*100) * time.Millisecond + time.Sleep(d) + + for i := 0; i < 100; i++ { + b2, err := bl.Find(b) + if err != nil { + t.Fatal(err) + } + + if b.Compare(b2) != 0 { + t.Fatalf("items are not equal: want %v, got %v", b, b2) + } + } + + bl2 := khepri.NewBlobList() + for i := 0; i < 200; i++ { + bl2.Insert(newBlob()) + } + + bl2.Merge(bl) + } + + bl := khepri.NewBlobList() + + for i := 0; uint(i) < *maxWorkers; i++ { + wg.Add(1) + go worker(bl) + } + + wg.Wait() +} diff --git a/contenthandler.go b/contenthandler.go index eaf9b8bd5..625261993 100644 --- a/contenthandler.go +++ b/contenthandler.go @@ -3,7 +3,7 @@ package khepri import ( "encoding/json" "errors" - "sync" + "fmt" "github.com/fd0/khepri/backend" ) @@ -12,16 +12,15 @@ type ContentHandler struct { be backend.Server key *Key - m sync.Mutex - content *StorageMap + bl *BlobList } // NewContentHandler creates a new content handler. func NewContentHandler(be backend.Server, key *Key) (*ContentHandler, error) { ch := &ContentHandler{ - be: be, - key: key, - content: NewStorageMap(), + be: be, + key: key, + bl: NewBlobList(), } return ch, nil @@ -34,9 +33,8 @@ func (ch *ContentHandler) LoadSnapshot(id backend.ID) (*Snapshot, error) { return nil, err } - ch.m.Lock() - defer ch.m.Unlock() - ch.content.Merge(sn.StorageMap) + ch.bl.Merge(sn.BlobList) + return sn, nil } @@ -50,9 +48,7 @@ func (ch *ContentHandler) LoadAllSnapshots() error { return } - ch.m.Lock() - defer ch.m.Unlock() - ch.content.Merge(sn.StorageMap) + ch.bl.Merge(sn.BlobList) }) if err != nil { return err @@ -63,20 +59,18 @@ func (ch *ContentHandler) LoadAllSnapshots() error { // Save encrypts data and stores it to the backend as type t. If the data was // already saved before, the blob is returned. -func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { +func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) { // compute plaintext hash id := backend.Hash(data) // test if the hash is already in the backend - ch.m.Lock() - defer ch.m.Unlock() - blob := ch.content.Find(id) - if blob != nil { + blob, err := ch.bl.Find(Blob{ID: id}) + if err == nil { return blob, nil } // else create a new blob - blob = &Blob{ + blob = Blob{ ID: id, Size: uint64(len(data)), } @@ -84,30 +78,30 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (*Blob, error) { // encrypt blob ciphertext, err := ch.key.Encrypt(data) if err != nil { - return nil, err + return Blob{}, err } // save blob sid, err := ch.be.Create(t, ciphertext) if err != nil { - return nil, err + return Blob{}, err } blob.Storage = sid blob.StorageSize = uint64(len(ciphertext)) // insert blob into the storage map - ch.content.Insert(blob) + ch.bl.Insert(blob) return blob, nil } // SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t. -func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (*Blob, error) { +func (ch *ContentHandler) SaveJSON(t backend.Type, item interface{}) (Blob, error) { // convert to json data, err := json.Marshal(item) if err != nil { - return nil, err + return Blob{}, err } // compress and save data @@ -133,11 +127,9 @@ func (ch *ContentHandler) Load(t backend.Type, id backend.ID) ([]byte, error) { } // lookup storage hash - ch.m.Lock() - defer ch.m.Unlock() - blob := ch.content.Find(id) - if blob == nil { - return nil, errors.New("Storage ID not found") + blob, err := ch.bl.Find(Blob{ID: id}) + if err != nil { + return nil, fmt.Errorf("Storage ID %s not found", id) } // load data diff --git a/restorer.go b/restorer.go index 8e94f5411..c1717db3f 100644 --- a/restorer.go +++ b/restorer.go @@ -30,12 +30,12 @@ func NewRestorer(be backend.Server, key *Key, snid backend.ID) (*Restorer, error var err error r.ch, err = NewContentHandler(be, key) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "create contenthandler for restorer") } r.sn, err = r.ch.LoadSnapshot(snid) if err != nil { - return nil, err + return nil, arrar.Annotate(err, "load snapshot for restorer") } // abort on all errors diff --git a/snapshot.go b/snapshot.go index fa7d10e1d..c0cc6c2bb 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,14 +11,14 @@ import ( ) type Snapshot struct { - Time time.Time `json:"time"` - Content backend.ID `json:"content"` - StorageMap *StorageMap `json:"map"` - Dir string `json:"dir"` - Hostname string `json:"hostname,omitempty"` - Username string `json:"username,omitempty"` - UID string `json:"uid,omitempty"` - GID string `json:"gid,omitempty"` + Time time.Time `json:"time"` + Content backend.ID `json:"content"` + BlobList *BlobList `json:"blobs"` + Dir string `json:"dir"` + Hostname string `json:"hostname,omitempty"` + Username string `json:"username,omitempty"` + UID string `json:"uid,omitempty"` + GID string `json:"gid,omitempty"` id backend.ID // plaintext ID, used during restore } diff --git a/storagemap.go b/storagemap.go deleted file mode 100644 index db77952e2..000000000 --- a/storagemap.go +++ /dev/null @@ -1,51 +0,0 @@ -package khepri - -import ( - "bytes" - "sort" - - "github.com/fd0/khepri/backend" -) - -type StorageMap Blobs - -func NewStorageMap() *StorageMap { - return &StorageMap{} -} - -func (m StorageMap) find(id backend.ID) (int, *Blob) { - i := sort.Search(len(m), func(i int) bool { - return bytes.Compare(m[i].ID, id) >= 0 - }) - - if i < len(m) && bytes.Equal(m[i].ID, id) { - return i, m[i] - } - - return i, nil -} - -func (m StorageMap) Find(id backend.ID) *Blob { - _, blob := m.find(id) - return blob -} - -func (m *StorageMap) Insert(blob *Blob) { - pos, b := m.find(blob.ID) - if b != nil { - // already present - return - } - - // insert blob - // https://code.google.com/p/go-wiki/wiki/SliceTricks - *m = append(*m, nil) - copy((*m)[pos+1:], (*m)[pos:]) - (*m)[pos] = blob -} - -func (m *StorageMap) Merge(sm *StorageMap) { - for _, blob := range *sm { - m.Insert(blob) - } -} diff --git a/tree.go b/tree.go index 778388122..0aa41a44d 100644 --- a/tree.go +++ b/tree.go @@ -46,7 +46,7 @@ type Blob struct { StorageSize uint64 `json:"ssize,omitempty"` // encrypted Size } -type Blobs []*Blob +type Blobs []Blob func (n Node) String() string { switch n.Type { From d11688f2423ce16b2f7880dd153ec86a10432fe1 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 22 Nov 2014 22:05:39 +0100 Subject: [PATCH 12/12] Save multiple data blobs in parallel --- archiver.go | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/archiver.go b/archiver.go index 226ea5ecc..4fa33819a 100644 --- a/archiver.go +++ b/archiver.go @@ -13,6 +13,7 @@ import ( const ( maxConcurrentFiles = 32 + maxConcurrentBlobs = 32 ) type Archiver struct { @@ -23,6 +24,7 @@ type Archiver struct { bl *BlobList // blobs used for the current snapshot fileToken chan struct{} + blobToken chan struct{} Stats Stats @@ -48,13 +50,18 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { be: be, key: key, fileToken: make(chan struct{}, maxConcurrentFiles), + blobToken: make(chan struct{}, maxConcurrentBlobs), } - // fill file token + // fill file and blob token for i := 0; i < maxConcurrentFiles; i++ { arch.fileToken <- struct{}{} } + for i := 0; i < maxConcurrentBlobs; i++ { + arch.blobToken <- struct{}{} + } + // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files @@ -138,6 +145,7 @@ func (arch *Archiver) SaveFile(node *Node) error { } else { // else store all chunks chnker := chunker.New(file) + chans := [](<-chan Blob){} for { chunk, err := chnker.Next() @@ -149,14 +157,28 @@ func (arch *Archiver) SaveFile(node *Node) error { return err } - blob, err := arch.ch.Save(backend.Data, chunk.Data) - if err != nil { - return err - } + // acquire token, start goroutine to save chunk + token := <-arch.blobToken + resCh := make(chan Blob, 1) - arch.saveUpdate(Stats{Bytes: blob.Size}) + go func(ch chan<- Blob) { + blob, err := arch.ch.Save(backend.Data, chunk.Data) + // TODO handle error + if err != nil { + panic(err) + } - blobs = append(blobs, blob) + arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.blobToken <- token + ch <- blob + }(resCh) + + chans = append(chans, resCh) + } + + blobs = []Blob{} + for _, ch := range chans { + blobs = append(blobs, <-ch) } }