From d58ae433179dd77ec06e20c0c7cd3573dfe1b5d6 Mon Sep 17 00:00:00 2001 From: Igor Fedorenko Date: Tue, 16 Jan 2018 23:59:16 -0500 Subject: [PATCH] Reworked Backend.Load API to retry errors during ongoing download Signed-off-by: Igor Fedorenko --- changelog/0.8.2/pull-1560 | 3 + .../archiver/archiver_duplication_test.go | 4 +- internal/backend/azure/azure.go | 11 ++- internal/backend/b2/b2.go | 10 ++- internal/backend/backend_error.go | 6 +- internal/backend/backend_retry.go | 10 +-- internal/backend/backend_retry_test.go | 63 ++++++++++++++ internal/backend/gs/gs.go | 11 ++- internal/backend/local/local.go | 11 ++- internal/backend/mem/mem_backend.go | 12 ++- internal/backend/rest/rest.go | 11 ++- internal/backend/s3/s3.go | 11 ++- internal/backend/sftp/sftp.go | 11 ++- internal/backend/swift/swift.go | 11 ++- internal/backend/test/benchmarks.go | 45 ++++------ internal/backend/test/tests.go | 85 ++++++------------- internal/backend/utils.go | 39 +++++---- internal/backend/utils_test.go | 53 ++++++++++++ internal/cache/backend.go | 54 +++++++----- internal/checker/checker.go | 31 +++---- internal/checker/checker_test.go | 47 ++++------ internal/limiter/limiter_backend.go | 20 ++--- internal/mock/backend.go | 30 +++++-- internal/repository/repack.go | 31 ++++--- internal/restic/backend.go | 12 ++- internal/restic/readerat.go | 13 ++- 26 files changed, 388 insertions(+), 257 deletions(-) create mode 100644 changelog/0.8.2/pull-1560 diff --git a/changelog/0.8.2/pull-1560 b/changelog/0.8.2/pull-1560 new file mode 100644 index 000000000..8915cdacd --- /dev/null +++ b/changelog/0.8.2/pull-1560 @@ -0,0 +1,3 @@ +Enhancement: retry all repository file download errors + +https://github.com/restic/restic/pull/1560 diff --git a/internal/archiver/archiver_duplication_test.go b/internal/archiver/archiver_duplication_test.go index bdcecf0c6..69a2ce21c 100644 --- a/internal/archiver/archiver_duplication_test.go +++ b/internal/archiver/archiver_duplication_test.go @@ -38,13 +38,13 @@ func randomID() restic.ID { // forgetfulBackend returns a backend that forgets everything. func forgetfulBackend() restic.Backend { - be := &mock.Backend{} + be := mock.NewBackend() be.TestFn = func(ctx context.Context, h restic.Handle) (bool, error) { return false, nil } - be.LoadFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return nil, errors.New("not found") } diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index ed401c868..763b27d56 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -178,10 +178,13 @@ func (wr wrapReader) Close() error { return err } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index ec5bf8d9c..c77ef36de 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -142,9 +142,13 @@ func (be *b2Backend) IsNotExist(err error) bool { return b2.IsNotExist(errors.Cause(err)) } -// Load returns the data stored in the backend for h at the given offset -// and saves it in p. Load has the same semantics as io.ReaderAt. -func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/backend_error.go b/internal/backend/backend_error.go index 98d6c0125..ee4b68b9b 100644 --- a/internal/backend/backend_error.go +++ b/internal/backend/backend_error.go @@ -66,12 +66,12 @@ func (be *ErrorBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) // given offset. If length is larger than zero, only a portion of the file // is returned. rd must be closed after use. If an error is returned, the // ReadCloser must be nil. -func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { if be.fail(be.FailLoad) { - return nil, errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset) + return errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset) } - return be.Backend.Load(ctx, h, length, offset) + return be.Backend.Load(ctx, h, length, offset, consumer) } // Stat returns information about the File identified by h. diff --git a/internal/backend/backend_retry.go b/internal/backend/backend_retry.go index 7890c0f70..6e00c086f 100644 --- a/internal/backend/backend_retry.go +++ b/internal/backend/backend_retry.go @@ -88,15 +88,11 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) // given offset. If length is larger than zero, only a portion of the file // is returned. rd must be closed after use. If an error is returned, the // ReadCloser must be nil. -func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (rd io.ReadCloser, err error) { - err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset), +func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) { + return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset), func() error { - var innerError error - rd, innerError = be.Backend.Load(ctx, h, length, offset) - - return innerError + return be.Backend.Load(ctx, h, length, offset, consumer) }) - return rd, err } // Stat returns information about the File identified by h. diff --git a/internal/backend/backend_retry_test.go b/internal/backend/backend_retry_test.go index 8a829d218..6abd4cba2 100644 --- a/internal/backend/backend_retry_test.go +++ b/internal/backend/backend_retry_test.go @@ -123,3 +123,66 @@ func TestBackendListRetry(t *testing.T) { test.Equals(t, 2, retry) // assert retried once test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files } + +// failingReader returns an error after reading limit number of bytes +type failingReader struct { + data []byte + pos int + limit int +} + +func (r failingReader) Read(p []byte) (n int, err error) { + i := 0 + for ; i < len(p) && i+r.pos < r.limit; i++ { + p[i] = r.data[r.pos+i] + } + r.pos += i + if r.pos >= r.limit { + return i, errors.Errorf("reader reached limit of %d", r.limit) + } + return i, nil +} +func (r failingReader) Close() error { + return nil +} + +// closingReader adapts io.Reader to io.ReadCloser interface +type closingReader struct { + rd io.Reader +} + +func (r closingReader) Read(p []byte) (n int, err error) { + return r.rd.Read(p) +} +func (r closingReader) Close() error { + return nil +} + +func TestBackendLoadRetry(t *testing.T) { + data := test.Random(23, 1024) + limit := 100 + attempt := 0 + + be := mock.NewBackend() + be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + // returns failing reader on first invocation, good reader on subsequent invocations + attempt++ + if attempt > 1 { + return closingReader{rd: bytes.NewReader(data)}, nil + } + return failingReader{data: data, limit: limit}, nil + } + + retryBackend := RetryBackend{ + Backend: be, + } + + var buf []byte + err := retryBackend.Load(context.TODO(), restic.Handle{}, 0, 0, func(rd io.Reader) (err error) { + buf, err = ioutil.ReadAll(rd) + return err + }) + test.OK(t, err) + test.Equals(t, data, buf) + test.Equals(t, 2, attempt) +} diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index d273d3e71..2eb8ada8c 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -282,10 +282,13 @@ func (wr wrapReader) Close() error { return err } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index 8bf949f37..2f82e2a42 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -146,10 +146,13 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd io.Reader) error { return setNewFileMode(filename, backend.Modes.File) } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn) +} + +func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 576ff8140..a64ef774d 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "sync" + "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -85,10 +86,13 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader return nil } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { if err := h.Valid(); err != nil { return nil, err } diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index 1f60b1f2f..2f5b35675 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -166,10 +166,13 @@ func (b *restBackend) IsNotExist(err error) bool { return ok } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn) +} + +func (b *restBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index b33e76f64..135dfa17b 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -281,10 +281,13 @@ func (wr wrapReader) Close() error { return err } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index a0e20101a..58fe38c24 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -327,10 +327,13 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err err return errors.Wrap(r.c.Chmod(filename, backend.Modes.File), "Chmod") } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) +} + +func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 27df0d55a..b9b1a82e4 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -109,10 +109,13 @@ func (be *beSwift) Location() string { return be.container } -// Load returns a reader that yields the contents of the file at h at the -// given offset. If length is nonzero, only a portion of the file is -// returned. rd must be closed after use. -func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) +} + +func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { return nil, err diff --git a/internal/backend/test/benchmarks.go b/internal/backend/test/benchmarks.go index f7d06db3c..2c3dbff2e 100644 --- a/internal/backend/test/benchmarks.go +++ b/internal/backend/test/benchmarks.go @@ -42,20 +42,15 @@ func (s *Suite) BenchmarkLoadFile(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - rd, err := be.Load(context.TODO(), handle, 0, 0) + var n int + err := be.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) (ierr error) { + n, ierr = io.ReadFull(rd, buf) + return ierr + }) if err != nil { t.Fatal(err) } - n, err := io.ReadFull(rd, buf) - if err != nil { - t.Fatal(err) - } - - if err = rd.Close(); err != nil { - t.Fatalf("Close() returned error: %v", err) - } - if n != length { t.Fatalf("wrong number of bytes read: want %v, got %v", length, n) } @@ -84,20 +79,15 @@ func (s *Suite) BenchmarkLoadPartialFile(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - rd, err := be.Load(context.TODO(), handle, testLength, 0) + var n int + err := be.Load(context.TODO(), handle, testLength, 0, func(rd io.Reader) (ierr error) { + n, ierr = io.ReadFull(rd, buf) + return ierr + }) if err != nil { t.Fatal(err) } - n, err := io.ReadFull(rd, buf) - if err != nil { - t.Fatal(err) - } - - if err = rd.Close(); err != nil { - t.Fatalf("Close() returned error: %v", err) - } - if n != testLength { t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n) } @@ -128,20 +118,15 @@ func (s *Suite) BenchmarkLoadPartialFileOffset(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - rd, err := be.Load(context.TODO(), handle, testLength, int64(testOffset)) + var n int + err := be.Load(context.TODO(), handle, testLength, int64(testOffset), func(rd io.Reader) (ierr error) { + n, ierr = io.ReadFull(rd, buf) + return ierr + }) if err != nil { t.Fatal(err) } - n, err := io.ReadFull(rd, buf) - if err != nil { - t.Fatal(err) - } - - if err = rd.Close(); err != nil { - t.Fatalf("Close() returned error: %v", err) - } - if n != testLength { t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n) } diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index f8b321a74..e1717ed2b 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -115,13 +115,14 @@ func (s *Suite) TestLoad(t *testing.T) { b := s.open(t) defer s.close(t, b) - rd, err := b.Load(context.TODO(), restic.Handle{}, 0, 0) + noop := func(rd io.Reader) error { + return nil + } + + err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop) if err == nil { t.Fatalf("Load() did not return an error for invalid handle") } - if rd != nil { - _ = rd.Close() - } err = testLoad(b, restic.Handle{Type: restic.DataFile, Name: "foobar"}, 0, 0) if err == nil { @@ -141,13 +142,19 @@ func (s *Suite) TestLoad(t *testing.T) { t.Logf("saved %d bytes as %v", length, handle) - rd, err = b.Load(context.TODO(), handle, 100, -1) + err = b.Load(context.TODO(), handle, 100, -1, noop) if err == nil { t.Fatalf("Load() returned no error for negative offset!") } - if rd != nil { - t.Fatalf("Load() returned a non-nil reader for negative offset!") + err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error { + return errors.Errorf("deliberate error") + }) + if err == nil { + t.Fatalf("Load() did not propagate consumer error!") + } + if err.Error() != "deliberate error" { + t.Fatalf("Load() did not correctly propagate consumer error!") } loadTests := 50 @@ -176,63 +183,38 @@ func (s *Suite) TestLoad(t *testing.T) { d = d[:l] } - rd, err := b.Load(context.TODO(), handle, getlen, int64(o)) + var buf []byte + err := b.Load(context.TODO(), handle, getlen, int64(o), func(rd io.Reader) (ierr error) { + buf, ierr = ioutil.ReadAll(rd) + return ierr + }) if err != nil { t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Errorf("Load(%d, %d) returned unexpected error: %+v", l, o, err) continue } - buf, err := ioutil.ReadAll(rd) - if err != nil { - t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) - t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %+v", l, o, err) - if err = rd.Close(); err != nil { - t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err) - } - continue - } - if l == 0 && len(buf) != len(d) { t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, len(d), len(buf)) - if err = rd.Close(); err != nil { - t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err) - } continue } if l > 0 && l <= len(d) && len(buf) != l { t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf)) - if err = rd.Close(); err != nil { - t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err) - } continue } if l > len(d) && len(buf) != len(d) { t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf)) - if err = rd.Close(); err != nil { - t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err) - } continue } if !bytes.Equal(buf, d) { t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Errorf("Load(%d, %d) returned wrong bytes", l, o) - if err = rd.Close(); err != nil { - t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err) - } - continue - } - - err = rd.Close() - if err != nil { - t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) - t.Errorf("Load(%d, %d) rd.Close() returned unexpected error: %+v", l, o, err) continue } } @@ -647,17 +629,10 @@ func store(t testing.TB, b restic.Backend, tpe restic.FileType, data []byte) res // testLoad loads a blob (but discards its contents). func testLoad(b restic.Backend, h restic.Handle, length int, offset int64) error { - rd, err := b.Load(context.TODO(), h, 0, 0) - if err != nil { - return err - } - - _, err = io.Copy(ioutil.Discard, rd) - cerr := rd.Close() - if err == nil { - err = cerr - } - return err + return b.Load(context.TODO(), h, 0, 0, func(rd io.Reader) (ierr error) { + _, ierr = io.Copy(ioutil.Discard, rd) + return ierr + }) } func (s *Suite) delayedRemove(t testing.TB, be restic.Backend, handles ...restic.Handle) error { @@ -776,18 +751,14 @@ func (s *Suite) TestBackend(t *testing.T) { length := end - start buf2 := make([]byte, length) - rd, err := b.Load(context.TODO(), h, len(buf2), int64(start)) + var n int + err = b.Load(context.TODO(), h, len(buf2), int64(start), func(rd io.Reader) (ierr error) { + n, ierr = io.ReadFull(rd, buf2) + return ierr + }) test.OK(t, err) - n, err := io.ReadFull(rd, buf2) test.OK(t, err) test.Equals(t, len(buf2), n) - - remaining, err := io.Copy(ioutil.Discard, rd) - test.OK(t, err) - test.Equals(t, int64(0), remaining) - - test.OK(t, rd.Close()) - test.Equals(t, ts.data[start:end], string(buf2)) } diff --git a/internal/backend/utils.go b/internal/backend/utils.go index 6d1871e63..222f210e5 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -10,24 +10,11 @@ import ( // LoadAll reads all data stored in the backend for the handle. func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) { - rd, err := be.Load(ctx, h, 0, 0) - if err != nil { - return nil, err - } - - defer func() { - _, e := io.Copy(ioutil.Discard, rd) - if err == nil { - err = e - } - - e = rd.Close() - if err == nil { - err = e - } - }() - - return ioutil.ReadAll(rd) + err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { + buf, ierr = ioutil.ReadAll(rd) + return ierr + }) + return buf, err } // LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. @@ -46,3 +33,19 @@ func (l *LimitedReadCloser) Read(p []byte) (int, error) { func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { return &LimitedReadCloser{ReadCloser: r, Reader: io.LimitReader(r, n)} } + +// DefaultLoad implements Backend.Load using lower-level openReader func +func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, + openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error), + fn func(rd io.Reader) error) error { + rd, err := openReader(ctx, h, length, offset) + if err != nil { + return err + } + err = fn(rd) + if err != nil { + rd.Close() // ignore secondary errors closing the reader + return err + } + return rd.Close() +} diff --git a/internal/backend/utils_test.go b/internal/backend/utils_test.go index b281588fc..ed7488a57 100644 --- a/internal/backend/utils_test.go +++ b/internal/backend/utils_test.go @@ -3,11 +3,13 @@ package backend_test import ( "bytes" "context" + "io" "math/rand" "testing" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/mem" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -89,3 +91,54 @@ func TestLoadLargeBuffer(t *testing.T) { } } } + +type mockReader struct { + closed bool +} + +func (rd *mockReader) Read(p []byte) (n int, err error) { + return 0, nil +} +func (rd *mockReader) Close() error { + rd.closed = true + return nil +} + +func TestDefaultLoad(t *testing.T) { + + h := restic.Handle{Name: "id", Type: restic.DataFile} + rd := &mockReader{} + + // happy case, assert correct parameters are passed around and content stream is closed + err := backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) { + rtest.Equals(t, h, ih) + rtest.Equals(t, int(10), length) + rtest.Equals(t, int64(11), offset) + + return rd, nil + }, func(ird io.Reader) error { + rtest.Equals(t, rd, ird) + return nil + }) + rtest.OK(t, err) + rtest.Equals(t, true, rd.closed) + + // unhappy case, assert producer errors are handled correctly + err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) { + return nil, errors.Errorf("producer error") + }, func(ird io.Reader) error { + t.Fatalf("unexpected consumer invocation") + return nil + }) + rtest.Equals(t, "producer error", err.Error()) + + // unhappy case, assert consumer errors are handled correctly + rd = &mockReader{} + err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) { + return rd, nil + }, func(ird io.Reader) error { + return errors.Errorf("consumer error") + }) + rtest.Equals(t, true, rd.closed) + rtest.Equals(t, "consumer error", err.Error()) +} diff --git a/internal/cache/backend.go b/internal/cache/backend.go index 827a25341..08639e462 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -121,17 +121,10 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { return nil } - rd, err := b.Backend.Load(ctx, h, 0, 0) + err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error { + return b.Cache.Save(h, rd) + }) if err != nil { - return err - } - - if err = b.Cache.Save(h, rd); err != nil { - _ = rd.Close() - return err - } - - if err = rd.Close(); err != nil { // try to remove from the cache, ignore errors _ = b.Cache.Remove(h) return err @@ -142,17 +135,22 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { // loadFromCacheOrDelegate will try to load the file from the cache, and fall // back to the backend if that fails. -func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { rd, err := b.Cache.Load(h, length, offset) - if err == nil { - return rd, nil + if err != nil { + return b.Backend.Load(ctx, h, length, offset, consumer) } - return b.Backend.Load(ctx, h, length, offset) + err = consumer(rd) + if err != nil { + rd.Close() // ignore secondary errors + return err + } + return rd.Close() } // Load loads a file from the cache or the backend. -func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { +func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { b.inProgressMutex.Lock() waitForFinish, inProgress := b.inProgress[h] b.inProgressMutex.Unlock() @@ -167,7 +165,12 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset debug.Log("Load(%v, %v, %v) from cache", h, length, offset) rd, err := b.Cache.Load(h, length, offset) if err == nil { - return rd, nil + err = consumer(rd) + if err != nil { + rd.Close() // ignore secondary errors + return err + } + return rd.Close() } debug.Log("error loading %v from cache: %v", h, err) } @@ -179,20 +182,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset err := b.cacheFile(ctx, h) if err == nil { - return b.loadFromCacheOrDelegate(ctx, h, length, offset) + return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer) } debug.Log("error caching %v: %v", h, err) } debug.Log("Load(%v, %v, %v): partial file requested, delegating to backend", h, length, offset) - return b.Backend.Load(ctx, h, length, offset) + return b.Backend.Load(ctx, h, length, offset, consumer) } // if we don't automatically cache this file type, fall back to the backend if _, ok := autoCacheFiles[h.Type]; !ok { debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset) - return b.Backend.Load(ctx, h, length, offset) + return b.Backend.Load(ctx, h, length, offset, consumer) } debug.Log("auto-store %v in the cache", h) @@ -200,11 +203,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset if err == nil { // load the cached version - return b.Cache.Load(h, 0, 0) + rd, err := b.Cache.Load(h, 0, 0) + if err != nil { + return err + } + err = consumer(rd) + if err != nil { + rd.Close() // ignore secondary errors + return err + } + return rd.Close() } debug.Log("error caching %v: %v, falling back to backend", h, err) - return b.Backend.Load(ctx, h, length, offset) + return b.Backend.Load(ctx, h, length, offset, consumer) } // Stat tests whether the backend has a file. If it does not exist but still diff --git a/internal/checker/checker.go b/internal/checker/checker.go index f1b94a170..5d75bcb2f 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -630,14 +630,8 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { debug.Log("checking pack %v", id) h := restic.Handle{Type: restic.DataFile, Name: id.String()} - rd, err := r.Backend().Load(ctx, h, 0, 0) - if err != nil { - return err - } - packfile, err := fs.TempFile("", "restic-temp-check-") if err != nil { - _ = rd.Close() return errors.Wrap(err, "TempFile") } @@ -646,18 +640,25 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { _ = os.Remove(packfile.Name()) }() - hrd := hashing.NewReader(rd, sha256.New()) - size, err := io.Copy(packfile, hrd) + var hash restic.ID + var size int64 + err = r.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { + _, ierr = packfile.Seek(0, io.SeekStart) + if ierr == nil { + ierr = packfile.Truncate(0) + } + if ierr != nil { + return ierr + } + hrd := hashing.NewReader(rd, sha256.New()) + size, ierr = io.Copy(packfile, hrd) + hash = restic.IDFromHash(hrd.Sum(nil)) + return ierr + }) if err != nil { - _ = rd.Close() - return errors.Wrap(err, "Copy") + return errors.Wrap(err, "checkPack") } - if err = rd.Close(); err != nil { - return err - } - - hash := restic.IDFromHash(hrd.Sum(nil)) debug.Log("hash for pack %v is %v", id, hash) if !hash.Equal(id) { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index d91c95054..f1b0fe938 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -195,20 +195,17 @@ func TestModifiedIndex(t *testing.T) { Type: restic.IndexFile, Name: "90f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd", } - f, err := repo.Backend().Load(context.TODO(), h, 0, 0) + err := repo.Backend().Load(context.TODO(), h, 0, 0, func(rd io.Reader) error { + // save the index again with a modified name so that the hash doesn't match + // the content any more + h2 := restic.Handle{ + Type: restic.IndexFile, + Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd", + } + return repo.Backend().Save(context.TODO(), h2, rd) + }) test.OK(t, err) - // save the index again with a modified name so that the hash doesn't match - // the content any more - h2 := restic.Handle{ - Type: restic.IndexFile, - Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd", - } - err = repo.Backend().Save(context.TODO(), h2, f) - test.OK(t, err) - - test.OK(t, f.Close()) - chkr := checker.New(repo) hints, errs := chkr.LoadIndex(context.TODO()) if len(errs) == 0 { @@ -262,35 +259,27 @@ type errorBackend struct { ProduceErrors bool } -func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - rd, err := b.Backend.Load(ctx, h, length, offset) - if err != nil { - return rd, err - } - - if b.ProduceErrors { - return errorReadCloser{rd}, err - } - - return rd, nil +func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { + return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error { + if b.ProduceErrors { + return consumer(errorReadCloser{rd}) + } + return consumer(rd) + }) } type errorReadCloser struct { - io.ReadCloser + io.Reader } func (erd errorReadCloser) Read(p []byte) (int, error) { - n, err := erd.ReadCloser.Read(p) + n, err := erd.Reader.Read(p) if n > 0 { induceError(p[:n]) } return n, err } -func (erd errorReadCloser) Close() error { - return erd.ReadCloser.Close() -} - // induceError flips a bit in the slice. func induceError(data []byte) { if rand.Float32() < 0.2 { diff --git a/internal/limiter/limiter_backend.go b/internal/limiter/limiter_backend.go index 58b26f199..963a084dd 100644 --- a/internal/limiter/limiter_backend.go +++ b/internal/limiter/limiter_backend.go @@ -25,16 +25,13 @@ func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Rea return r.Backend.Save(ctx, h, r.limiter.Upstream(rd)) } -func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - rc, err := r.Backend.Load(ctx, h, length, offset) - if err != nil { - return nil, err - } - - return limitedReadCloser{ - original: rc, - limited: r.limiter.Downstream(rc), - }, nil +func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error { + return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error { + lrd := limitedReadCloser{ + limited: r.limiter.Downstream(rd), + } + return consumer(lrd) + }) } type limitedReadCloser struct { @@ -47,6 +44,9 @@ func (l limitedReadCloser) Read(b []byte) (n int, err error) { } func (l limitedReadCloser) Close() error { + if l.original == nil { + return nil + } return l.original.Close() } diff --git a/internal/mock/backend.go b/internal/mock/backend.go index 29543c5fe..14288c3f6 100644 --- a/internal/mock/backend.go +++ b/internal/mock/backend.go @@ -13,7 +13,7 @@ type Backend struct { CloseFn func() error IsNotExistFn func(err error) bool SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error - LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) + OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error RemoveFn func(ctx context.Context, h restic.Handle) error @@ -22,6 +22,12 @@ type Backend struct { LocationFn func() string } +// NewBackend returns new mock Backend instance +func NewBackend() *Backend { + be := &Backend{} + return be +} + // Close the backend. func (m *Backend) Close() error { if m.CloseFn == nil { @@ -58,13 +64,27 @@ func (m *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error return m.SaveFn(ctx, h, rd) } -// Load loads data from the backend. -func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - if m.LoadFn == nil { +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + rd, err := m.openReader(ctx, h, length, offset) + if err != nil { + return err + } + err = fn(rd) + if err != nil { + rd.Close() // ignore secondary errors closing the reader + return err + } + return rd.Close() +} + +func (m *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + if m.OpenReaderFn == nil { return nil, errors.New("not implemented") } - return m.LoadFn(ctx, h, length, offset) + return m.OpenReaderFn(ctx, h, length, offset) } // Stat an object in the backend. diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 3572b9411..4bcd232d1 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -32,23 +32,26 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee return nil, errors.Wrap(err, "TempFile") } - beRd, err := repo.Backend().Load(ctx, h, 0, 0) + // TODO very similar code in checker, consider moving to utils.go + var hash restic.ID + var packLength int64 + err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { + _, ierr = tempfile.Seek(0, io.SeekStart) + if ierr == nil { + ierr = tempfile.Truncate(0) + } + if ierr != nil { + return ierr + } + hrd := hashing.NewReader(rd, sha256.New()) + packLength, ierr = io.Copy(tempfile, hrd) + hash = restic.IDFromHash(hrd.Sum(nil)) + return ierr + }) if err != nil { - return nil, err + return nil, errors.Wrap(err, "Repack") } - hrd := hashing.NewReader(beRd, sha256.New()) - packLength, err := io.Copy(tempfile, hrd) - if err != nil { - _ = beRd.Close() - return nil, errors.Wrap(err, "Copy") - } - - if err = beRd.Close(); err != nil { - return nil, errors.Wrap(err, "Close") - } - - hash := restic.IDFromHash(hrd.Sum(nil)) debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) if !packID.Equal(hash) { diff --git a/internal/restic/backend.go b/internal/restic/backend.go index ea253975c..c5dc25087 100644 --- a/internal/restic/backend.go +++ b/internal/restic/backend.go @@ -23,11 +23,15 @@ type Backend interface { // Save stores the data in the backend under the given handle. Save(ctx context.Context, h Handle, rd io.Reader) error - // Load returns a reader that yields the contents of the file at h at the + // Load runs fn with a reader that yields the contents of the file at h at the // given offset. If length is larger than zero, only a portion of the file - // is returned. rd must be closed after use. If an error is returned, the - // ReadCloser must be nil. - Load(ctx context.Context, h Handle, length int, offset int64) (io.ReadCloser, error) + // is read. + // + // The function fn may be called multiple times during the same Load invocation + // and therefore must be idempotent. + // + // Implementations are encouraged to use backend.DefaultLoad + Load(ctx context.Context, h Handle, length int, offset int64, fn func(rd io.Reader) error) error // Stat returns information about the File identified by h. Stat(ctx context.Context, h Handle) (FileInfo, error) diff --git a/internal/restic/readerat.go b/internal/restic/readerat.go index b03fa4687..6e945b43a 100644 --- a/internal/restic/readerat.go +++ b/internal/restic/readerat.go @@ -25,17 +25,16 @@ func ReaderAt(be Backend, h Handle) io.ReaderAt { // ReadAt reads from the backend handle h at the given position. func ReadAt(ctx context.Context, be Backend, h Handle, offset int64, p []byte) (n int, err error) { debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p)) - rd, err := be.Load(ctx, h, len(p), offset) + + err = be.Load(ctx, h, len(p), offset, func(rd io.Reader) (ierr error) { + n, ierr = io.ReadFull(rd, p) + + return ierr + }) if err != nil { return 0, err } - n, err = io.ReadFull(rd, p) - e := rd.Close() - if err == nil { - err = e - } - debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n) return n, errors.Wrapf(err, "ReadFull(%v)", h)