From 6f24d038f80865be269b3e37eb8a98477904b527 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Thu, 15 Jun 2017 13:12:46 +0200 Subject: [PATCH 1/3] prune: Only remove data after index has been uploaded Closes #1032 --- src/cmds/restic/cmd_prune.go | 10 +++++----- src/cmds/restic/cmd_rebuild_index.go | 6 +++--- src/restic/index/index.go | 4 ++-- src/restic/list/list.go | 6 +++++- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/src/cmds/restic/cmd_prune.go b/src/cmds/restic/cmd_prune.go index e74c73ce9..4705644c7 100644 --- a/src/cmds/restic/cmd_prune.go +++ b/src/cmds/restic/cmd_prune.go @@ -97,7 +97,7 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { Verbosef("building new index for repo\n") bar := newProgressMax(!gopts.Quiet, uint64(stats.packs), "packs") - idx, err := index.New(ctx, repo, bar) + idx, err := index.New(ctx, repo, restic.NewIDSet(), bar) if err != nil { return err } @@ -222,6 +222,10 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { bar.Done() } + if err = rebuildIndex(ctx, repo, removePacks); err != nil { + return err + } + if len(removePacks) != 0 { bar = newProgressMax(!gopts.Quiet, uint64(len(removePacks)), "packs deleted") bar.Start() @@ -236,10 +240,6 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { bar.Done() } - if err = rebuildIndex(ctx, repo); err != nil { - return err - } - Verbosef("done\n") return nil } diff --git a/src/cmds/restic/cmd_rebuild_index.go b/src/cmds/restic/cmd_rebuild_index.go index 6a60ea900..9f3cc888c 100644 --- a/src/cmds/restic/cmd_rebuild_index.go +++ b/src/cmds/restic/cmd_rebuild_index.go @@ -38,10 +38,10 @@ func runRebuildIndex(gopts GlobalOptions) error { ctx, cancel := context.WithCancel(gopts.ctx) defer cancel() - return rebuildIndex(ctx, repo) + return rebuildIndex(ctx, repo, restic.NewIDSet()) } -func rebuildIndex(ctx context.Context, repo restic.Repository) error { +func rebuildIndex(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet) error { Verbosef("counting files in repo\n") var packs uint64 @@ -50,7 +50,7 @@ func rebuildIndex(ctx context.Context, repo restic.Repository) error { } bar := newProgressMax(!globalOptions.Quiet, packs, "packs") - idx, err := index.New(ctx, repo, bar) + idx, err := index.New(ctx, repo, ignorePacks, bar) if err != nil { return err } diff --git a/src/restic/index/index.go b/src/restic/index/index.go index ab1ebafa4..be528efc0 100644 --- a/src/restic/index/index.go +++ b/src/restic/index/index.go @@ -34,12 +34,12 @@ func newIndex() *Index { } // New creates a new index for repo from scratch. -func New(ctx context.Context, repo restic.Repository, p *restic.Progress) (*Index, error) { +func New(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet, p *restic.Progress) (*Index, error) { p.Start() defer p.Done() ch := make(chan worker.Job) - go list.AllPacks(ctx, repo, ch) + go list.AllPacks(ctx, repo, ignorePacks, ch) idx := newIndex() diff --git a/src/restic/list/list.go b/src/restic/list/list.go index 292ba8475..6bd9d23aa 100644 --- a/src/restic/list/list.go +++ b/src/restic/list/list.go @@ -37,7 +37,7 @@ func (l Result) Entries() []restic.Blob { } // AllPacks sends the contents of all packs to ch. -func AllPacks(ctx context.Context, repo Lister, ch chan<- worker.Job) { +func AllPacks(ctx context.Context, repo Lister, ignorePacks restic.IDSet, ch chan<- worker.Job) { f := func(ctx context.Context, job worker.Job) (interface{}, error) { packID := job.Data.(restic.ID) entries, size, err := repo.ListPack(ctx, packID) @@ -55,6 +55,10 @@ func AllPacks(ctx context.Context, repo Lister, ch chan<- worker.Job) { go func() { defer close(jobCh) for id := range repo.List(ctx, restic.DataFile) { + if ignorePacks.Has(id) { + continue + } + select { case jobCh <- worker.Job{Data: id}: case <-ctx.Done(): From af9ba3be91deca4e0a731c1c333ac7d73c00dfaa Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Thu, 15 Jun 2017 13:40:27 +0200 Subject: [PATCH 2/3] backend: Add IsNotExist --- src/cmds/restic/cmd_prune.go | 4 ++++ src/restic/backend.go | 4 ++++ src/restic/backend/b2/b2.go | 5 +++++ src/restic/backend/local/local.go | 5 +++++ src/restic/backend/mem/mem_backend.go | 13 ++++++++--- src/restic/backend/rest/rest.go | 32 +++++++++++++++++++++++++++ src/restic/mock/backend.go | 28 +++++++++++++++-------- 7 files changed, 79 insertions(+), 12 deletions(-) diff --git a/src/cmds/restic/cmd_prune.go b/src/cmds/restic/cmd_prune.go index 4705644c7..71b83369e 100644 --- a/src/cmds/restic/cmd_prune.go +++ b/src/cmds/restic/cmd_prune.go @@ -152,6 +152,10 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { err = restic.FindUsedBlobs(ctx, repo, *sn.Tree, usedBlobs, seenBlobs) if err != nil { + if repo.Backend().IsNotExist(err) { + return errors.Fatal("unable to load a tree from the repo: " + err.Error()) + } + return err } diff --git a/src/restic/backend.go b/src/restic/backend.go index 0020a76a9..b3d91cc62 100644 --- a/src/restic/backend.go +++ b/src/restic/backend.go @@ -36,6 +36,10 @@ type Backend interface { // arbitrary order. A goroutine is started for this, which is stopped when // ctx is cancelled. List(ctx context.Context, t FileType) <-chan string + + // IsNotExist returns true if the error was caused by a non-existing file + // in the backend. + IsNotExist(err error) bool } // FileInfo is returned by Stat() and contains information about a file in the diff --git a/src/restic/backend/b2/b2.go b/src/restic/backend/b2/b2.go index c19915038..69ccb9bdc 100644 --- a/src/restic/backend/b2/b2.go +++ b/src/restic/backend/b2/b2.go @@ -151,6 +151,11 @@ func (wr *wrapReader) Close() error { return err } +// IsNotExist returns true if the error is caused by a non-existing file. +func (be *b2Backend) IsNotExist(err error) bool { + return b2.IsNotExist(errors.Cause(err)) +} + // Load returns the data stored in the backend for h at the given offset // and saves it in p. Load has the same semantics as io.ReaderAt. func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { diff --git a/src/restic/backend/local/local.go b/src/restic/backend/local/local.go index 1a3c0158c..4d58b187d 100644 --- a/src/restic/backend/local/local.go +++ b/src/restic/backend/local/local.go @@ -75,6 +75,11 @@ func (b *Local) Location() string { return b.Path } +// IsNotExist returns true if the error is caused by a non existing file. +func (b *Local) IsNotExist(err error) bool { + return os.IsNotExist(errors.Cause(err)) +} + // Save stores data in the backend at the handle. func (b *Local) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err error) { debug.Log("Save %v", h) diff --git a/src/restic/backend/mem/mem_backend.go b/src/restic/backend/mem/mem_backend.go index bbb4dbd1a..cee81799a 100644 --- a/src/restic/backend/mem/mem_backend.go +++ b/src/restic/backend/mem/mem_backend.go @@ -19,6 +19,8 @@ type memMap map[restic.Handle][]byte // make sure that MemoryBackend implements backend.Backend var _ restic.Backend = &MemoryBackend{} +var errNotFound = errors.New("not found") + // MemoryBackend is a mock backend that uses a map for storing all data in // memory. This should only be used for tests. type MemoryBackend struct { @@ -51,6 +53,11 @@ func (be *MemoryBackend) Test(ctx context.Context, h restic.Handle) (bool, error return false, nil } +// IsNotExist returns true if the file does not exist. +func (be *MemoryBackend) IsNotExist(err error) bool { + return errors.Cause(err) == errNotFound +} + // Save adds new Data to the backend. func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error { if err := h.Valid(); err != nil { @@ -101,7 +108,7 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, } if _, ok := be.data[h]; !ok { - return nil, errors.New("no such data") + return nil, errNotFound } buf := be.data[h] @@ -134,7 +141,7 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File e, ok := be.data[h] if !ok { - return restic.FileInfo{}, errors.New("no such data") + return restic.FileInfo{}, errNotFound } return restic.FileInfo{Size: int64(len(e))}, nil @@ -148,7 +155,7 @@ func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { debug.Log("Remove %v", h) if _, ok := be.data[h]; !ok { - return errors.New("no such data") + return errNotFound } delete(be.data, h) diff --git a/src/restic/backend/rest/rest.go b/src/restic/backend/rest/rest.go index 4145a2a32..99dc2ba63 100644 --- a/src/restic/backend/rest/rest.go +++ b/src/restic/backend/rest/rest.go @@ -138,6 +138,23 @@ func (b *restBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) ( return nil } +// ErrIsNotExist is returned whenever the requested file does not exist on the +// server. +type ErrIsNotExist struct { + restic.Handle +} + +func (e ErrIsNotExist) Error() string { + return fmt.Sprintf("%v does not exist", e.Handle) +} + +// IsNotExist returns true if the error was caused by a non-existing file. +func (b *restBackend) IsNotExist(err error) bool { + err = errors.Cause(err) + _, ok := err.(ErrIsNotExist) + return ok +} + // Load returns a reader that yields the contents of the file at h at the // given offset. If length is nonzero, only a portion of the file is // returned. rd must be closed after use. @@ -179,6 +196,11 @@ func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, off return nil, errors.Wrap(err, "client.Do") } + if resp.StatusCode == http.StatusNotFound { + _ = resp.Body.Close() + return nil, ErrIsNotExist{h} + } + if resp.StatusCode != 200 && resp.StatusCode != 206 { _ = resp.Body.Close() return nil, errors.Errorf("unexpected HTTP response (%v): %v", resp.StatusCode, resp.Status) @@ -205,6 +227,11 @@ func (b *restBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInf return restic.FileInfo{}, errors.Wrap(err, "Close") } + if resp.StatusCode == http.StatusNotFound { + _ = resp.Body.Close() + return restic.FileInfo{}, ErrIsNotExist{h} + } + if resp.StatusCode != 200 { return restic.FileInfo{}, errors.Errorf("unexpected HTTP response (%v): %v", resp.StatusCode, resp.Status) } @@ -248,6 +275,11 @@ func (b *restBackend) Remove(ctx context.Context, h restic.Handle) error { return errors.Wrap(err, "client.Do") } + if resp.StatusCode == http.StatusNotFound { + _ = resp.Body.Close() + return ErrIsNotExist{h} + } + if resp.StatusCode != 200 { return errors.Errorf("blob not removed, server response: %v (%v)", resp.Status, resp.StatusCode) } diff --git a/src/restic/mock/backend.go b/src/restic/mock/backend.go index 10effe045..ead50efcb 100644 --- a/src/restic/mock/backend.go +++ b/src/restic/mock/backend.go @@ -10,15 +10,16 @@ import ( // Backend implements a mock backend. type Backend struct { - CloseFn func() error - SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error - LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) - StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) - ListFn func(ctx context.Context, t restic.FileType) <-chan string - RemoveFn func(ctx context.Context, h restic.Handle) error - TestFn func(ctx context.Context, h restic.Handle) (bool, error) - DeleteFn func(ctx context.Context) error - LocationFn func() string + CloseFn func() error + IsNotExistFn func(err error) bool + SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error + LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) + StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) + ListFn func(ctx context.Context, t restic.FileType) <-chan string + RemoveFn func(ctx context.Context, h restic.Handle) error + TestFn func(ctx context.Context, h restic.Handle) (bool, error) + DeleteFn func(ctx context.Context) error + LocationFn func() string } // Close the backend. @@ -39,6 +40,15 @@ func (m *Backend) Location() string { return m.LocationFn() } +// IsNotExist returns true if the error is caused by a missing file. +func (m *Backend) IsNotExist(err error) bool { + if m.IsNotExistFn == nil { + return false + } + + return m.IsNotExistFn(err) +} + // Save data in the backend. func (m *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error { if m.SaveFn == nil { From c79fb6fcdd254382705753957bd299472696a2c6 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Thu, 15 Jun 2017 14:40:34 +0200 Subject: [PATCH 3/3] prune: Delete repacked files as the very last step --- src/cmds/restic/cmd_prune.go | 4 ++- src/restic/index/index_test.go | 10 +++--- src/restic/repository/repack.go | 48 +++++++++++----------------- src/restic/repository/repack_test.go | 11 +++++-- 4 files changed, 36 insertions(+), 37 deletions(-) diff --git a/src/cmds/restic/cmd_prune.go b/src/cmds/restic/cmd_prune.go index 71b83369e..e4441c8f3 100644 --- a/src/cmds/restic/cmd_prune.go +++ b/src/cmds/restic/cmd_prune.go @@ -216,10 +216,11 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { Verbosef("will delete %d packs and rewrite %d packs, this frees %s\n", len(removePacks), len(rewritePacks), formatBytes(uint64(removeBytes))) + var repackedBlobs restic.IDSet if len(rewritePacks) != 0 { bar = newProgressMax(!gopts.Quiet, uint64(len(rewritePacks)), "packs rewritten") bar.Start() - err = repository.Repack(ctx, repo, rewritePacks, usedBlobs, bar) + repackedBlobs, err = repository.Repack(ctx, repo, rewritePacks, usedBlobs, bar) if err != nil { return err } @@ -230,6 +231,7 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { return err } + removePacks.Merge(repackedBlobs) if len(removePacks) != 0 { bar = newProgressMax(!gopts.Quiet, uint64(len(removePacks)), "packs deleted") bar.Start() diff --git a/src/restic/index/index_test.go b/src/restic/index/index_test.go index 11d0cc08a..81c3d6e4b 100644 --- a/src/restic/index/index_test.go +++ b/src/restic/index/index_test.go @@ -43,7 +43,7 @@ func TestIndexNew(t *testing.T) { repo, cleanup := createFilledRepo(t, 3, 0) defer cleanup() - idx, err := New(context.TODO(), repo, nil) + idx, err := New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { t.Fatalf("New() returned error %v", err) } @@ -70,7 +70,7 @@ func TestIndexLoad(t *testing.T) { validateIndex(t, repo, loadIdx) - newIdx, err := New(context.TODO(), repo, nil) + newIdx, err := New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { t.Fatalf("New() returned error %v", err) } @@ -134,7 +134,7 @@ func BenchmarkIndexNew(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - idx, err := New(context.TODO(), repo, nil) + idx, err := New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { b.Fatalf("New() returned error %v", err) @@ -151,7 +151,7 @@ func BenchmarkIndexSave(b *testing.B) { repo, cleanup := repository.TestRepository(b) defer cleanup() - idx, err := New(context.TODO(), repo, nil) + idx, err := New(context.TODO(), repo, restic.NewIDSet(), nil) test.OK(b, err) for i := 0; i < 8000; i++ { @@ -184,7 +184,7 @@ func TestIndexDuplicateBlobs(t *testing.T) { repo, cleanup := createFilledRepo(t, 3, 0.01) defer cleanup() - idx, err := New(context.TODO(), repo, nil) + idx, err := New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { t.Fatal(err) } diff --git a/src/restic/repository/repack.go b/src/restic/repository/repack.go index 36a000783..e613d8da2 100644 --- a/src/restic/repository/repack.go +++ b/src/restic/repository/repack.go @@ -16,9 +16,9 @@ import ( // Repack takes a list of packs together with a list of blobs contained in // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved -// into a new pack. Afterwards, the packs are removed. This operation requires -// an exclusive lock on the repo. -func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (err error) { +// into a new pack. Returned is the list of obsolete packs which can then +// be removed. +func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (obsoletePacks restic.IDSet, err error) { debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) for packID := range packs { @@ -27,39 +27,39 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee tempfile, err := fs.TempFile("", "restic-temp-repack-") if err != nil { - return errors.Wrap(err, "TempFile") + return nil, errors.Wrap(err, "TempFile") } beRd, err := repo.Backend().Load(ctx, h, 0, 0) if err != nil { - return err + return nil, err } hrd := hashing.NewReader(beRd, sha256.New()) packLength, err := io.Copy(tempfile, hrd) if err != nil { - return errors.Wrap(err, "Copy") + return nil, errors.Wrap(err, "Copy") } if err = beRd.Close(); err != nil { - return errors.Wrap(err, "Close") + return nil, errors.Wrap(err, "Close") } hash := restic.IDFromHash(hrd.Sum(nil)) debug.Log("pack %v loaded (%d bytes), hash %v", packID.Str(), packLength, hash.Str()) if !packID.Equal(hash) { - return errors.Errorf("hash does not match id: want %v, got %v", packID, hash) + return nil, errors.Errorf("hash does not match id: want %v, got %v", packID, hash) } _, err = tempfile.Seek(0, 0) if err != nil { - return errors.Wrap(err, "Seek") + return nil, errors.Wrap(err, "Seek") } blobs, err := pack.List(repo.Key(), tempfile, packLength) if err != nil { - return err + return nil, err } debug.Log("processing pack %v, blobs: %v", packID.Str(), len(blobs)) @@ -80,30 +80,30 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee n, err := tempfile.ReadAt(buf, int64(entry.Offset)) if err != nil { - return errors.Wrap(err, "ReadAt") + return nil, errors.Wrap(err, "ReadAt") } if n != len(buf) { - return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", + return nil, errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", h, tempfile.Name(), len(buf), n) } n, err = crypto.Decrypt(repo.Key(), buf, buf) if err != nil { - return err + return nil, err } buf = buf[:n] id := restic.Hash(buf) if !id.Equal(entry.ID) { - return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", + return nil, errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", h, tempfile.Name(), id) } _, err = repo.SaveBlob(ctx, entry.Type, buf, entry.ID) if err != nil { - return err + return nil, err } debug.Log(" saved blob %v", entry.ID.Str()) @@ -112,11 +112,11 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee } if err = tempfile.Close(); err != nil { - return errors.Wrap(err, "Close") + return nil, errors.Wrap(err, "Close") } if err = fs.RemoveIfExists(tempfile.Name()); err != nil { - return errors.Wrap(err, "Remove") + return nil, errors.Wrap(err, "Remove") } if p != nil { p.Report(restic.Stat{Blobs: 1}) @@ -124,18 +124,8 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee } if err := repo.Flush(); err != nil { - return err + return nil, err } - for packID := range packs { - h := restic.Handle{Type: restic.DataFile, Name: packID.String()} - err := repo.Backend().Remove(ctx, h) - if err != nil { - debug.Log("error removing pack %v: %v", packID.Str(), err) - return err - } - debug.Log("removed pack %v", packID.Str()) - } - - return nil + return packs, nil } diff --git a/src/restic/repository/repack_test.go b/src/restic/repository/repack_test.go index d339cf2b8..fa64f8eff 100644 --- a/src/restic/repository/repack_test.go +++ b/src/restic/repository/repack_test.go @@ -127,10 +127,17 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe } func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) { - err := repository.Repack(context.TODO(), repo, packs, blobs, nil) + repackedBlobs, err := repository.Repack(context.TODO(), repo, packs, blobs, nil) if err != nil { t.Fatal(err) } + + for id := range repackedBlobs { + err = repo.Backend().Remove(context.TODO(), restic.Handle{Type: restic.DataFile, Name: id.String()}) + if err != nil { + t.Fatal(err) + } + } } func saveIndex(t *testing.T, repo restic.Repository) { @@ -140,7 +147,7 @@ func saveIndex(t *testing.T, repo restic.Repository) { } func rebuildIndex(t *testing.T, repo restic.Repository) { - idx, err := index.New(context.TODO(), repo, nil) + idx, err := index.New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { t.Fatal(err) }