diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index b2b5e59bb..ae4879ff4 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -43,51 +43,18 @@ func (s *BlobSaver) TriggerShutdown() { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. -func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { select { - case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}: case <-ctx.Done(): debug.Log("not sending job, context is cancelled") - close(ch) - return FutureBlob{ch: ch} } - - return FutureBlob{ch: ch} -} - -// FutureBlob is returned by SaveBlob and will return the data once it has been processed. -type FutureBlob struct { - ch <-chan SaveBlobResponse -} - -func (s *FutureBlob) Poll() *SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return &res - } - default: - } - return nil -} - -// Take blocks until the result is available or the context is cancelled. -func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return res - } - case <-ctx.Done(): - } - return SaveBlobResponse{} } type saveBlobJob struct { restic.BlobType buf *Buffer - ch chan<- SaveBlobResponse + cb func(res SaveBlobResponse) } type SaveBlobResponse struct { @@ -128,11 +95,9 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) if err != nil { debug.Log("saveBlob returned error, exiting: %v", err) - close(job.ch) return err } - job.ch <- res - close(job.ch) + job.cb(res) job.buf.Release() } } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index b25478b26..caa1e7c39 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "testing" @@ -45,16 +46,22 @@ func TestBlobSaver(t *testing.T) { b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) - var results []FutureBlob + var wait sync.WaitGroup + var results []SaveBlobResponse + wait.Add(20) for i := 0; i < 20; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - fb := b.Save(ctx, restic.DataBlob, buf) - results = append(results, fb) + idx := i + results = append(results, SaveBlobResponse{}) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) { + results[idx] = res + wait.Done() + }) } - for i, blob := range results { - sbr := blob.Take(ctx) + wait.Wait() + for i, sbr := range results { if sbr.known { t.Errorf("blob %v is known, that should not be the case", i) } @@ -94,7 +101,7 @@ func TestBlobSaverError(t *testing.T) { for i := 0; i < test.blobs; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - b.Save(ctx, restic.DataBlob, buf) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {}) } b.TriggerShutdown() diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index fc008945c..1c9352ef2 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/debug" @@ -14,7 +15,7 @@ import ( ) // SaveBlobFn saves a blob to a repo. -type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob +type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse)) // FileSaver concurrently saves incoming files to the repo. type FileSaver struct { @@ -101,46 +102,67 @@ type saveFileJob struct { } // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finish func(res futureNodeResult)) { start() - stats := ItemStats{} fnr := futureNodeResult{ snPath: snPath, target: target, } + var lock sync.Mutex + remaining := 0 + isCompleted := false + + completeBlob := func() { + lock.Lock() + defer lock.Unlock() + + remaining-- + if remaining == 0 && fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + finish(fnr) + } + } + completeError := func(err error) { + lock.Lock() + defer lock.Unlock() + + if fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + fnr.err = err + fnr.node = nil + fnr.stats = ItemStats{} + finish(fnr) + } + } debug.Log("%v", snPath) node, err := s.NodeFromFileInfo(snPath, f.Name(), fi) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } if node.Type != "file" { _ = f.Close() - fnr.err = errors.Errorf("node type %q is wrong", node.Type) - return fnr + completeError(errors.Errorf("node type %q is wrong", node.Type)) + return } // reuse the chunker chnker.Reset(f, s.pol) - var results []FutureBlob - complete := func(sbr SaveBlobResponse) { - if !sbr.known { - stats.DataBlobs++ - stats.DataSize += uint64(sbr.length) - stats.DataSizeInRepo += uint64(sbr.sizeInRepo) - } - - node.Content = append(node.Content, sbr.id) - } - node.Content = []restic.ID{} - var size uint64 + node.Size = 0 + var idx int for { buf := s.saveFilePool.Get() chunk, err := chnker.Next(buf.Data) @@ -150,62 +172,62 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } buf.Data = chunk.Data - - size += uint64(chunk.Length) + node.Size += uint64(chunk.Length) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } + // test if the context has been cancelled, return the error + if ctx.Err() != nil { + _ = f.Close() + completeError(ctx.Err()) + return + } + + // add a place to store the saveBlob result + pos := idx + node.Content = append(node.Content, restic.ID{}) + + s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) { + lock.Lock() + if !sbr.known { + fnr.stats.DataBlobs++ + fnr.stats.DataSize += uint64(sbr.length) + fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Content[pos] = sbr.id + lock.Unlock() + + completeBlob() + }) + idx++ // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - fnr.err = ctx.Err() - return fnr - } - - res := s.saveBlob(ctx, restic.DataBlob, buf) - results = append(results, res) - - // test if the context has been cancelled, return the error - if ctx.Err() != nil { - _ = f.Close() - fnr.err = ctx.Err() - return fnr + completeError(ctx.Err()) + return } s.CompleteBlob(uint64(len(chunk.Data))) - - // collect already completed blobs - for len(results) > 0 { - sbr := results[0].Poll() - if sbr == nil { - break - } - results[0] = FutureBlob{} - results = results[1:] - complete(*sbr) - } } err = f.Close() if err != nil { - fnr.err = err - return fnr + completeError(err) + return } - for i, res := range results { - results[i] = FutureBlob{} - sbr := res.Take(ctx) - complete(sbr) - } - - node.Size = size fnr.node = node - fnr.stats = stats - return fnr + lock.Lock() + // require one additional completeFuture() call to ensure that the future only completes + // after reaching the end of this method + remaining += idx + 1 + lock.Unlock() + completeBlob() } func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { @@ -224,11 +246,12 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start) - if job.complete != nil { - job.complete(res.node, res.stats) - } - job.ch <- res - close(job.ch) + s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func(res futureNodeResult) { + if job.complete != nil { + job.complete(res.node, res.stats) + } + job.ch <- res + close(job.ch) + }) } } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index a311216c7..dde4356fc 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -34,10 +34,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) { wg, ctx := errgroup.WithContext(ctx) - saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse) - close(ch) - return FutureBlob{ch: ch} + saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) { + cb(SaveBlobResponse{}) } workers := uint(runtime.NumCPU()) diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 180f987cd..d25781b03 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -11,7 +11,7 @@ import ( // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { - saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob + saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) errFn ErrorFunc ch chan<- saveTreeJob @@ -19,7 +19,7 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -124,21 +124,24 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I } b := &Buffer{Data: buf} - res := s.saveBlob(ctx, restic.TreeBlob, b) + ch := make(chan SaveBlobResponse, 1) + s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) { + ch <- res + }) - sbr := res.Take(ctx) - if !sbr.known { - stats.TreeBlobs++ - stats.TreeSize += uint64(sbr.length) - stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) - } - // The context was canceled in the meantime, id might be invalid - if ctx.Err() != nil { + select { + case sbr := <-ch: + if !sbr.known { + stats.TreeBlobs++ + stats.TreeSize += uint64(sbr.length) + stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Subtree = &sbr.id + return node, stats, nil + case <-ctx.Done(): return nil, stats, ctx.Err() } - - node.Subtree = &sbr.id - return node, stats, nil } func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index ab55a2742..7cc53346c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -12,15 +12,13 @@ import ( "golang.org/x/sync/errgroup" ) -func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) - ch <- SaveBlobResponse{ +func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { + cb(SaveBlobResponse{ id: restic.NewRandomID(), known: false, length: len(buf.Data), sizeInRepo: len(buf.Data), - } - return FutureBlob{ch: ch} + }) } func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) {