From 4fb140126654fdd4a7a05e32de10e839b06d1872 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 13:17:33 +0200 Subject: [PATCH 01/12] Fix --cacert help text --- cmd/restic/global.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 8c89a4d80..6f9d657e3 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -94,7 +94,7 @@ func init() { f.BoolVarP(&globalOptions.JSON, "json", "", false, "set output mode to JSON for commands that support it") f.StringVar(&globalOptions.CacheDir, "cache-dir", "", "set the cache directory") f.BoolVar(&globalOptions.NoCache, "no-cache", false, "do not use a local cache") - f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "path to load root certificates from (default: use system certificates)") + f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "`file` to load root certificates from (default: use system certificates)") f.StringVar(&globalOptions.TLSClientCert, "tls-client-cert", "", "path to a file containing PEM encoded TLS client certificate and private key") f.BoolVar(&globalOptions.CleanupCache, "cleanup-cache", false, "auto remove old cache directories") f.IntVar(&globalOptions.LimitUploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)") From 4ba8d402820b715535df39f6283e9281a91415cf Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 30 Apr 2018 14:18:12 +0200 Subject: [PATCH 02/12] Add block profile option --- cmd/restic/global_debug.go | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/cmd/restic/global_debug.go b/cmd/restic/global_debug.go index cb7dac10a..bc6a51993 100644 --- a/cmd/restic/global_debug.go +++ b/cmd/restic/global_debug.go @@ -15,19 +15,21 @@ import ( ) var ( - listenMemoryProfile string - memProfilePath string - cpuProfilePath string - traceProfilePath string - insecure bool + listenProfile string + memProfilePath string + cpuProfilePath string + traceProfilePath string + blockProfilePath string + insecure bool ) func init() { f := cmdRoot.PersistentFlags() - f.StringVar(&listenMemoryProfile, "listen-profile", "", "listen on this `address:port` for memory profiling") + f.StringVar(&listenProfile, "listen-profile", "", "listen on this `address:port` for memory profiling") f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`") f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`") f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`") + f.StringVar(&blockProfilePath, "block-profile", "", "write block profile to `dir`") f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings") } @@ -38,12 +40,12 @@ func (fakeTestingTB) Logf(msg string, args ...interface{}) { } func runDebug() error { - if listenMemoryProfile != "" { - fmt.Fprintf(os.Stderr, "running memory profile HTTP server on %v\n", listenMemoryProfile) + if listenProfile != "" { + fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", listenProfile) go func() { - err := http.ListenAndServe(listenMemoryProfile, nil) + err := http.ListenAndServe(listenProfile, nil) if err != nil { - fmt.Fprintf(os.Stderr, "memory profile listen failed: %v\n", err) + fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err) } }() } @@ -58,9 +60,12 @@ func runDebug() error { if traceProfilePath != "" { profilesEnabled++ } + if blockProfilePath != "" { + profilesEnabled++ + } if profilesEnabled > 1 { - return errors.Fatal("only one profile (memory or CPU) may be activated at the same time") + return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time") } var prof interface { @@ -73,6 +78,8 @@ func runDebug() error { prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath)) } else if traceProfilePath != "" { prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath)) + } else if blockProfilePath != "" { + prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(blockProfilePath)) } if prof != nil { From d926b9fd803fd5742f35b0693b3743f3d28c875d Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 15:42:05 +0200 Subject: [PATCH 03/12] Add profile build flag --- cmd/restic/global_debug.go | 2 +- cmd/restic/global_release.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/restic/global_debug.go b/cmd/restic/global_debug.go index bc6a51993..6f04d047b 100644 --- a/cmd/restic/global_debug.go +++ b/cmd/restic/global_debug.go @@ -1,4 +1,4 @@ -// +build debug +// +build debug profile package main diff --git a/cmd/restic/global_release.go b/cmd/restic/global_release.go index 04c7cba31..f17d99639 100644 --- a/cmd/restic/global_release.go +++ b/cmd/restic/global_release.go @@ -1,4 +1,4 @@ -// +build !debug +// +build !debug,!profile package main From d8bbe5dc843c67239e5ab584b7ba84ec3bbbe8f0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 14:19:10 +0200 Subject: [PATCH 04/12] Print repository ID after opening --- cmd/restic/global.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 6f9d657e3..6fe026bc9 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -355,7 +355,11 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) { } if stdoutIsTerminal() { - Verbosef("password is correct\n") + id := s.Config().ID + if len(id) > 8 { + id = id[:8] + } + Verbosef("repository %v opened successfully, password is correct\n", id) } if opts.NoCache { From 846c2b6869d599773225dac529656f030304bcae Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 14:40:49 +0200 Subject: [PATCH 05/12] backup: Fix ETA calculation for >= 100% --- internal/ui/backup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/ui/backup.go b/internal/ui/backup.go index ebd56b8bc..f6f840136 100644 --- a/internal/ui/backup.go +++ b/internal/ui/backup.go @@ -147,7 +147,7 @@ func (b *Backup) update(total, processed counter, errors uint, currentFiles map[ } else { var eta string - if secs > 0 { + if secs > 0 && processed.Bytes < total.Bytes { eta = fmt.Sprintf(" ETA %s", formatSeconds(secs)) } From d80e108b03840c58a65a223b8a5ac2dcd86de135 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 15:01:21 +0200 Subject: [PATCH 06/12] backup: Clear status lines on finish --- internal/ui/backup.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/ui/backup.go b/internal/ui/backup.go index f6f840136..71aaa5552 100644 --- a/internal/ui/backup.go +++ b/internal/ui/backup.go @@ -40,6 +40,7 @@ type Backup struct { processedCh chan counter errCh chan struct{} workerCh chan fileWorkerMessage + clearStatus chan struct{} summary struct { sync.Mutex @@ -68,6 +69,7 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup { processedCh: make(chan counter), errCh: make(chan struct{}), workerCh: make(chan fileWorkerMessage), + clearStatus: make(chan struct{}), } } @@ -90,6 +92,9 @@ func (b *Backup) Run(ctx context.Context) error { select { case <-ctx.Done(): return nil + case <-b.clearStatus: + started = false + b.term.SetStatus([]string{""}) case t, ok := <-b.totalCh: if ok { total = t @@ -332,6 +337,8 @@ func (b *Backup) ReportTotal(item string, s archiver.ScanStats) { // Finish prints the finishing messages. func (b *Backup) Finish() { + b.clearStatus <- struct{}{} + b.V("processed %s in %s", formatBytes(b.totalBytes), formatDuration(time.Since(b.start))) b.V("\n") b.V("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged) From 400730afca394205afc761c98189428eadc0675c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 13:20:12 +0200 Subject: [PATCH 07/12] archiver: Improve memory usage, tune buffer pool --- internal/archiver/archiver.go | 6 +++++- internal/archiver/file_saver.go | 13 +++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 143c81e34..b5e2cd3b3 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -720,7 +720,11 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) // runWorkers starts the worker pools, which are stopped when the context is cancelled. func (arch *Archiver) runWorkers(ctx context.Context) { arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) - arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency) + arch.fileSaver = NewFileSaver(ctx, + arch.FS, + arch.blobSaver, + arch.Repo.Config().ChunkerPolynomial, + arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 9a923c6c7..32232e28f 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -63,20 +63,25 @@ type FileSaver struct { // NewFileSaver returns a new file saver. A worker pool with workers is // started, it is stopped when ctx is cancelled. -func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, workers uint) *FileSaver { - ch := make(chan saveFileJob, workers) +func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { + ch := make(chan saveFileJob, fileWorkers) + + poolSize := fileWorkers + if blobWorkers > fileWorkers { + poolSize = blobWorkers + } s := &FileSaver{ fs: fs, blobSaver: blobSaver, - saveFilePool: NewBufferPool(ctx, 3*int(workers), chunker.MaxSize/4), + saveFilePool: NewBufferPool(ctx, int(poolSize)*3/2, chunker.MaxSize/2), pol: pol, ch: ch, CompleteBlob: func(string, uint64) {}, } - for i := uint(0); i < workers; i++ { + for i := uint(0); i < fileWorkers; i++ { s.wg.Add(1) go s.worker(ctx, &s.wg, ch) } From 39ac12f6eab33c6d97905a23f6e1eb5a97b2c555 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 13:42:23 +0200 Subject: [PATCH 08/12] archiver: Correct comment --- internal/archiver/file_saver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 32232e28f..bab2d66fd 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -13,7 +13,7 @@ import ( "github.com/restic/restic/internal/restic" ) -// FutureFile is returned by SaveFile and will return the data once it +// FutureFile is returned by Save and will return the data once it // has been processed. type FutureFile struct { ch <-chan saveFileResponse From 78bd591c7c1005762e2d066d1b01b0eec42935e9 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 29 Apr 2018 15:34:41 +0200 Subject: [PATCH 09/12] archiver: Improve buffer pool --- internal/archiver/archiver.go | 2 +- internal/archiver/blob_saver.go | 4 ++-- internal/archiver/buffer.go | 37 ++++++++++++++++----------------- internal/archiver/file_saver.go | 14 ++++++------- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index b5e2cd3b3..5b3fd611a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -172,7 +172,7 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, // adds a newline after each object) buf = append(buf, '\n') - b := Buffer{Data: buf} + b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) if res.Err() != nil { return restic.ID{}, s, res.Err() diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 5e45d7175..1863d440e 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -45,7 +45,7 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. The second return parameter is true if the blob was // previously unknown. -func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { ch := make(chan saveBlobResponse, 1) s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} @@ -91,7 +91,7 @@ func (s *FutureBlob) Length() int { type saveBlobJob struct { restic.BlobType - buf Buffer + buf *Buffer ch chan<- saveBlobResponse } diff --git a/internal/archiver/buffer.go b/internal/archiver/buffer.go index c97d990cf..ef7131322 100644 --- a/internal/archiver/buffer.go +++ b/internal/archiver/buffer.go @@ -9,19 +9,19 @@ import ( // be called so the underlying slice is put back into the pool. type Buffer struct { Data []byte - Put func([]byte) + Put func(*Buffer) } // Release puts the buffer back into the pool it came from. -func (b Buffer) Release() { +func (b *Buffer) Release() { if b.Put != nil { - b.Put(b.Data) + b.Put(b) } } // BufferPool implements a limited set of reusable buffers. type BufferPool struct { - ch chan []byte + ch chan *Buffer chM sync.Mutex defaultSize int clearOnce sync.Once @@ -33,7 +33,7 @@ type BufferPool struct { // back. func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { b := &BufferPool{ - ch: make(chan []byte, max), + ch: make(chan *Buffer, max), defaultSize: defaultSize, } go func() { @@ -44,22 +44,29 @@ func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { } // Get returns a new buffer, either from the pool or newly allocated. -func (pool *BufferPool) Get() Buffer { - b := Buffer{Put: pool.put} - +func (pool *BufferPool) Get() *Buffer { pool.chM.Lock() defer pool.chM.Unlock() select { case buf := <-pool.ch: - b.Data = buf + return buf default: - b.Data = make([]byte, pool.defaultSize) + } + + b := &Buffer{ + Put: pool.Put, + Data: make([]byte, pool.defaultSize), } return b } -func (pool *BufferPool) put(b []byte) { +// Put returns a buffer to the pool for reuse. +func (pool *BufferPool) Put(b *Buffer) { + if cap(b.Data) > pool.defaultSize { + return + } + pool.chM.Lock() defer pool.chM.Unlock() select { @@ -68,14 +75,6 @@ func (pool *BufferPool) put(b []byte) { } } -// Put returns a buffer to the pool for reuse. -func (pool *BufferPool) Put(b Buffer) { - if cap(b.Data) > pool.defaultSize { - return - } - pool.put(b.Data) -} - // clear empties the buffer so that all items can be garbage collected. func (pool *BufferPool) clear() { pool.clearOnce.Do(func() { diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index bab2d66fd..276528564 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -61,20 +61,19 @@ type FileSaver struct { NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) } -// NewFileSaver returns a new file saver. A worker pool with workers is +// NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { - ch := make(chan saveFileJob, fileWorkers) + ch := make(chan saveFileJob) - poolSize := fileWorkers - if blobWorkers > fileWorkers { - poolSize = blobWorkers - } + debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) + + poolSize := fileWorkers + blobWorkers s := &FileSaver{ fs: fs, blobSaver: blobSaver, - saveFilePool: NewBufferPool(ctx, int(poolSize)*3/2, chunker.MaxSize/2), + saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize), pol: pol, ch: ch, @@ -156,6 +155,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat buf.Release() break } + buf.Data = chunk.Data size += uint64(chunk.Length) From 4e34325035d23df6ec302eb50159d863dc887db2 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 30 Apr 2018 15:13:03 +0200 Subject: [PATCH 10/12] archiver: Process dirs concurrently --- internal/archiver/archiver.go | 102 ++++++++++--------- internal/archiver/archiver_test.go | 14 ++- internal/archiver/blob_saver.go | 2 +- internal/archiver/tree_saver.go | 158 +++++++++++++++++++++++++++++ 4 files changed, 223 insertions(+), 53 deletions(-) create mode 100644 internal/archiver/tree_saver.go diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 5b3fd611a..677976153 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -50,6 +50,7 @@ type Archiver struct { blobSaver *BlobSaver fileSaver *FileSaver + treeSaver *TreeSaver // Error is called for all errors that occur during backup. Error ErrorFunc @@ -86,6 +87,10 @@ type Options struct { // concurrently. If it's set to zero, the default is the number of CPUs // available in the system. SaveBlobConcurrency uint + + // SaveTreeConcurrency sets how many trees are marshalled and saved to the + // repo concurrently. + SaveTreeConcurrency uint } // ApplyDefaults returns a copy of o with the default options set for all unset @@ -102,6 +107,12 @@ func (o Options) ApplyDefaults() Options { o.SaveBlobConcurrency = uint(runtime.NumCPU()) } + if o.SaveTreeConcurrency == 0 { + // use a relatively high concurrency here, having multiple SaveTree + // workers is cheap + o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20 + } + return o } @@ -212,24 +223,20 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *resti // SaveDir stores a directory in the repo and returns the node. snPath is the // path within the current snapshot. -func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) { +func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (d FutureTree, err error) { debug.Log("%v %v", snPath, dir) - var s ItemStats - treeNode, err := arch.nodeFromFileInfo(dir, fi) if err != nil { - return nil, s, err + return FutureTree{}, err } names, err := readdirnames(arch.FS, dir) if err != nil { - return nil, s, err + return FutureTree{}, err } - var futures []FutureNode - - tree := restic.NewTree() + nodes := make([]FutureNode, 0, len(names)) for _, name := range names { pathname := arch.FS.Join(dir, name) @@ -245,54 +252,22 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo continue } - return nil, s, err + return FutureTree{}, err } if excluded { continue } - futures = append(futures, fn) + nodes = append(nodes, fn) } - for _, fn := range futures { - fn.wait() + ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes) - // return the error if it wasn't ignored - if fn.err != nil { - fn.err = arch.error(fn.target, fn.fi, fn.err) - if fn.err == nil { - // ignore error - continue - } - - return nil, s, fn.err - } - - // when the error is ignored, the node could not be saved, so ignore it - if fn.node == nil { - debug.Log("%v excluded: %v", fn.snPath, fn.target) - continue - } - - err := tree.Insert(fn.node) - if err != nil { - return nil, s, err - } - } - - id, treeStats, err := arch.saveTree(ctx, tree) - if err != nil { - return nil, ItemStats{}, err - } - - s.Add(treeStats) - - treeNode.Subtree = &id - return treeNode, s, nil + return ft, nil } -// FutureNode holds a reference to a node or a FutureFile. +// FutureNode holds a reference to a node, FutureFile, or FutureTree. type FutureNode struct { snPath, target string @@ -306,14 +281,31 @@ type FutureNode struct { isFile bool file FutureFile + isDir bool + dir FutureTree } -func (fn *FutureNode) wait() { - if fn.isFile { +func (fn *FutureNode) wait(ctx context.Context) { + switch { + case fn.isFile: // wait for and collect the data for the file fn.node = fn.file.Node() fn.err = fn.file.Err() fn.stats = fn.file.Stats() + + // ensure the other stuff can be garbage-collected + fn.file = FutureFile{} + fn.isFile = false + + case fn.isDir: + // wait for and collect the data for the dir + fn.node = fn.dir.Node() + fn.err = fn.dir.Err() + fn.stats = fn.dir.Stats() + + // ensure the other stuff can be garbage-collected + fn.dir = FutureTree{} + fn.isDir = false } } @@ -324,6 +316,8 @@ func (fn *FutureNode) wait() { // // snPath is the path within the current snapshot. func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { + start := time.Now() + fn = FutureNode{ snPath: snPath, target: target, @@ -400,7 +394,9 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous snItem := snPath + "/" start := time.Now() oldSubtree := arch.loadSubtree(ctx, previous) - fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) + + fn.isDir = true + fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) if err == nil { arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) } else { @@ -429,6 +425,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous } } + debug.Log("return after %.3f", time.Since(start).Seconds()) + return fn, false, nil } @@ -564,9 +562,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start)) } + debug.Log("waiting on %d nodes", len(futureNodes)) + // process all futures for name, fn := range futureNodes { - fn.wait() + fn.wait(ctx) // return the error, or ignore it if fn.err != nil { @@ -720,14 +720,16 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) // runWorkers starts the worker pools, which are stopped when the context is cancelled. func (arch *Archiver) runWorkers(ctx context.Context) { arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) + arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob - arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo + + arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) } // Snapshot saves several targets and returns a snapshot. diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index a8557ef2a..8916f58a3 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -608,7 +608,12 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil) + ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil) + if err != nil { + t.Fatal(err) + } + + node, stats, err := ft.Node(), ft.Stats(), ft.Err() if err != nil { t.Fatal(err) } @@ -681,7 +686,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) + ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) + if err != nil { + t.Fatal(err) + } + + node, stats, err := ft.Node(), ft.Stats(), ft.Err() if err != nil { t.Fatal(err) } diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 1863d440e..4d0f39c48 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -27,7 +27,7 @@ type BlobSaver struct { // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { - ch := make(chan saveBlobJob, 2*int(workers)) + ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, knownBlobs: restic.NewBlobSet(), diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go new file mode 100644 index 000000000..6428c6289 --- /dev/null +++ b/internal/archiver/tree_saver.go @@ -0,0 +1,158 @@ +package archiver + +import ( + "context" + "sync" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" +) + +// FutureTree is returned by Save and will return the data once it +// has been processed. +type FutureTree struct { + ch <-chan saveTreeResponse + res saveTreeResponse +} + +func (s *FutureTree) wait() { + res, ok := <-s.ch + if ok { + s.res = res + } +} + +// Node returns the node once it is available. +func (s *FutureTree) Node() *restic.Node { + s.wait() + return s.res.node +} + +// Stats returns the stats for the file once they are available. +func (s *FutureTree) Stats() ItemStats { + s.wait() + return s.res.stats +} + +// Err returns the error in case an error occurred. +func (s *FutureTree) Err() error { + s.wait() + return s.res.err +} + +// TreeSaver concurrently saves incoming trees to the repo. +type TreeSaver struct { + saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) + errFn ErrorFunc + + ch chan<- saveTreeJob + wg sync.WaitGroup +} + +// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is +// started, it is stopped when ctx is cancelled. +func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { + ch := make(chan saveTreeJob) + + s := &TreeSaver{ + ch: ch, + saveTree: saveTree, + errFn: errFn, + } + + for i := uint(0); i < treeWorkers; i++ { + s.wg.Add(1) + go s.worker(ctx, &s.wg, ch) + } + + return s +} + +// Save stores the dir d and returns the data once it has been completed. +func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree { + ch := make(chan saveTreeResponse, 1) + s.ch <- saveTreeJob{ + snPath: snPath, + node: node, + nodes: nodes, + ch: ch, + } + + return FutureTree{ch: ch} +} + +type saveTreeJob struct { + snPath string + nodes []FutureNode + node *restic.Node + ch chan<- saveTreeResponse +} + +type saveTreeResponse struct { + node *restic.Node + stats ItemStats + err error +} + +// save stores the nodes as a tree in the repo. +func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) { + var stats ItemStats + + tree := restic.NewTree() + for _, fn := range nodes { + fn.wait(ctx) + + // return the error if it wasn't ignored + if fn.err != nil { + debug.Log("err for %v: %v", fn.node.Name, fn.err) + fn.err = s.errFn(fn.target, fn.fi, fn.err) + if fn.err == nil { + // ignore error + continue + } + + return nil, stats, fn.err + } + + // when the error is ignored, the node could not be saved, so ignore it + if fn.node == nil { + debug.Log("%v excluded: %v", fn.snPath, fn.target) + continue + } + + debug.Log("insert %v", fn.node.Name) + err := tree.Insert(fn.node) + if err != nil { + return nil, stats, err + } + } + + id, treeStats, err := s.saveTree(ctx, tree) + stats.Add(treeStats) + if err != nil { + return nil, stats, err + } + + node.Subtree = &id + return node, stats, nil +} + +func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) { + defer wg.Done() + for { + var job saveTreeJob + select { + case <-ctx.Done(): + return + case job = <-jobs: + } + + node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) + job.ch <- saveTreeResponse{ + node: node, + stats: stats, + err: err, + } + close(job.ch) + } +} From 19b9c881ca9fc1a4c4473047b1f9f60d7dcfe768 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 30 Apr 2018 15:34:41 +0200 Subject: [PATCH 11/12] fs: Add O_NONBLOCK --- internal/fs/const.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/fs/const.go b/internal/fs/const.go index dfa6ad5f0..f5f65de8e 100644 --- a/internal/fs/const.go +++ b/internal/fs/const.go @@ -5,12 +5,13 @@ import "syscall" // Flags to OpenFile wrapping those of the underlying system. Not all flags may // be implemented on a given system. const ( - O_RDONLY int = syscall.O_RDONLY // open the file read-only. - O_WRONLY int = syscall.O_WRONLY // open the file write-only. - O_RDWR int = syscall.O_RDWR // open the file read-write. - O_APPEND int = syscall.O_APPEND // append data to the file when writing. - O_CREATE int = syscall.O_CREAT // create a new file if none exists. - O_EXCL int = syscall.O_EXCL // used with O_CREATE, file must not exist - O_SYNC int = syscall.O_SYNC // open for synchronous I/O. - O_TRUNC int = syscall.O_TRUNC // if possible, truncate file when opened. + O_RDONLY int = syscall.O_RDONLY // open the file read-only. + O_WRONLY int = syscall.O_WRONLY // open the file write-only. + O_RDWR int = syscall.O_RDWR // open the file read-write. + O_APPEND int = syscall.O_APPEND // append data to the file when writing. + O_CREATE int = syscall.O_CREAT // create a new file if none exists. + O_EXCL int = syscall.O_EXCL // used with O_CREATE, file must not exist + O_SYNC int = syscall.O_SYNC // open for synchronous I/O. + O_TRUNC int = syscall.O_TRUNC // if possible, truncate file when opened. + O_NONBLOCK int = syscall.O_NONBLOCK // don't block open on fifos etc. ) From c83c03ed63df305dfeedcc39bb56a83ff67e0e07 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 30 Apr 2018 15:31:08 +0200 Subject: [PATCH 12/12] archiver: Fix blocking on pipes --- internal/archiver/archiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 677976153..9970e45b3 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -334,7 +334,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous var fi os.FileInfo var errFI error - file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0) + file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW|fs.O_NONBLOCK, 0) if errOpen == nil { fi, errFI = file.Stat() }