From 91906911b0b2d66b62603c9134db82aa4285a418 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sat, 6 Jun 2020 22:20:44 +0200 Subject: [PATCH] Fix non-intuitive repository behavior - The SaveBlob method now checks for duplicates. - Moves handling of pending blobs to MasterIndex. -> also cleans up pending index entries when they are saved in the index -> when using SaveBlob no need to care about index any longer - Always check for full index and save it when storing packs. -> removes the need of an index uploader -> also removes the verbose "uploaded intermediate index" messages - The Flush method now also saves the index - Fix race condition when checking and saving full/non-finalized indexes --- changelog/unreleased/pull-2773 | 8 +++ cmd/restic/cmd_backup.go | 18 ------- cmd/restic/cmd_recover.go | 5 -- internal/archiver/archiver.go | 5 -- internal/archiver/archiver_test.go | 35 ++++-------- internal/archiver/blob_saver.go | 48 +++-------------- internal/archiver/blob_saver_test.go | 6 +-- internal/archiver/index_uploader.go | 53 ------------------ internal/repository/index.go | 28 +++++++--- internal/repository/index_test.go | 5 +- internal/repository/master_index.go | 60 +++++++++++++++++---- internal/repository/packer_manager.go | 17 ++---- internal/repository/repack.go | 3 +- internal/repository/repack_test.go | 13 +++-- internal/repository/repository.go | 74 +++++++++++++++----------- internal/repository/repository_test.go | 20 +++---- internal/restic/repository.go | 2 +- internal/restic/testing.go | 17 +----- internal/restorer/restorer_test.go | 7 +-- 19 files changed, 171 insertions(+), 253 deletions(-) create mode 100644 changelog/unreleased/pull-2773 delete mode 100644 internal/archiver/index_uploader.go diff --git a/changelog/unreleased/pull-2773 b/changelog/unreleased/pull-2773 new file mode 100644 index 000000000..800583f94 --- /dev/null +++ b/changelog/unreleased/pull-2773 @@ -0,0 +1,8 @@ +Enhancement: Optimize repository implementation + +We've optimized the implementation of handling index entries in the repository. + +Restic now uses less memory for backups which add a lot of data, e.g. large initial backups. +Also it is more stable in some edge situations and allows easier future enhancments. + +https://github.com/restic/restic/pull/2773 diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 4debeb13b..bbabf91d0 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -575,24 +575,6 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina ParentSnapshot: *parentSnapshotID, } - uploader := archiver.IndexUploader{ - Repository: repo, - Start: func() { - if !gopts.JSON { - p.VV("uploading intermediate index") - } - }, - Complete: func(id restic.ID) { - if !gopts.JSON { - p.V("uploaded intermediate index %v", id.Str()) - } - }, - } - - t.Go(func() error { - return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second) - }) - if !gopts.JSON { p.V("start backup on %v", targets) } diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 2a06f301f..9cdfac8cd 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -130,11 +130,6 @@ func runRecover(gopts GlobalOptions) error { return errors.Fatalf("unable to save blobs to the repo: %v", err) } - err = repo.SaveIndex(gopts.ctx) - if err != nil { - return errors.Fatalf("unable to save new index to the repo: %v", err) - } - sn, err := restic.NewSnapshot([]string{"/recover"}, []string{}, hostname, time.Now()) if err != nil { return errors.Fatalf("unable to save snapshot: %v", err) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 2565ba677..bc1432c28 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -783,11 +783,6 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - err = arch.Repo.SaveIndex(ctx) - if err != nil { - return nil, restic.ID{}, err - } - sn, err := restic.NewSnapshot(targets, opts.Tags, opts.Hostname, opts.Time) if err != nil { return nil, restic.ID{}, err diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 34bf94824..76c814bc3 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -96,11 +96,6 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - err = repo.SaveIndex(ctx) - if err != nil { - t.Fatal(err) - } - if !startCallback { t.Errorf("start callback did not happen") } @@ -418,13 +413,16 @@ type blobCountingRepo struct { saved map[restic.BlobHandle]uint } -func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { - id, err := repo.Repository.SaveBlob(ctx, t, buf, id) +func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) { + id, exists, err := repo.Repository.SaveBlob(ctx, t, buf, id, false) + if exists { + return id, exists, err + } h := restic.BlobHandle{ID: id, Type: t} repo.m.Lock() repo.saved[h]++ repo.m.Unlock() - return id, err + return id, exists, err } func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) { @@ -853,11 +851,6 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - err = repo.SaveIndex(ctx) - if err != nil { - t.Fatal(err) - } - want := test.want if want == nil { want = test.src @@ -946,11 +939,6 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - err = repo.SaveIndex(ctx) - if err != nil { - t.Fatal(err) - } - for h, n := range repo.saved { if n > 1 { t.Errorf("iteration %v: blob %v saved more than once (%d times)", i, h, n) @@ -1085,11 +1073,6 @@ func TestArchiverSaveTree(t *testing.T) { t.Fatal(err) } - err = repo.SaveIndex(ctx) - if err != nil { - t.Fatal(err) - } - want := test.want if want == nil { want = test.src @@ -1841,13 +1824,13 @@ type failSaveRepo struct { err error } -func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { +func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) { val := atomic.AddInt32(&f.cnt, 1) if val >= f.failAfter { - return restic.ID{}, f.err + return restic.ID{}, false, f.err } - return f.Repository.SaveBlob(ctx, t, buf, id) + return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate) } func TestArchiverAbortEarlyOnError(t *testing.T) { diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 45db04f46..89599b149 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -2,7 +2,6 @@ package archiver import ( "context" - "sync" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -11,18 +10,14 @@ import ( // Saver allows saving a blob. type Saver interface { - SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error) + SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, error) Index() restic.Index } // BlobSaver concurrently saves incoming blobs to the repo. type BlobSaver struct { repo Saver - - m sync.Mutex - knownBlobs restic.BlobSet - - ch chan<- saveBlobJob + ch chan<- saveBlobJob } // NewBlobSaver returns a new blob. A worker pool is started, it is stopped @@ -30,9 +25,8 @@ type BlobSaver struct { func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob) s := &BlobSaver{ - repo: repo, - knownBlobs: restic.NewBlobSet(), - ch: ch, + repo: repo, + ch: ch, } for i := uint(0); i < workers; i++ { @@ -106,45 +100,15 @@ type saveBlobResponse struct { } func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { - id := restic.Hash(buf) - h := restic.BlobHandle{ID: id, Type: t} + id, known, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false) - // check if another goroutine has already saved this blob - known := false - s.m.Lock() - if s.knownBlobs.Has(h) { - known = true - } else { - s.knownBlobs.Insert(h) - known = false - } - s.m.Unlock() - - // blob is already known, nothing to do - if known { - return saveBlobResponse{ - id: id, - known: true, - }, nil - } - - // check if the repo knows this blob - if s.repo.Index().Has(id, t) { - return saveBlobResponse{ - id: id, - known: true, - }, nil - } - - // otherwise we're responsible for saving it - _, err := s.repo.SaveBlob(ctx, t, buf, id) if err != nil { return saveBlobResponse{}, err } return saveBlobResponse{ id: id, - known: false, + known: known, }, nil } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index f3b0e1a68..18832ae1b 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -21,13 +21,13 @@ type saveFail struct { failAt int32 } -func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { +func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicates bool) (restic.ID, bool, error) { val := atomic.AddInt32(&b.cnt, 1) if val == b.failAt { - return restic.ID{}, errTest + return restic.ID{}, false, errTest } - return id, nil + return id, false, nil } func (b *saveFail) Index() restic.Index { diff --git a/internal/archiver/index_uploader.go b/internal/archiver/index_uploader.go deleted file mode 100644 index c6edb7a01..000000000 --- a/internal/archiver/index_uploader.go +++ /dev/null @@ -1,53 +0,0 @@ -package archiver - -import ( - "context" - "time" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/repository" - "github.com/restic/restic/internal/restic" -) - -// IndexUploader polls the repo for full indexes and uploads them. -type IndexUploader struct { - restic.Repository - - // Start is called when an index is to be uploaded. - Start func() - - // Complete is called when uploading an index has finished. - Complete func(id restic.ID) -} - -// Upload periodically uploads full indexes to the repo. When shutdown is -// cancelled, the last index upload will finish and then Upload returns. -func (u IndexUploader) Upload(ctx, shutdown context.Context, interval time.Duration) error { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil - case <-shutdown.Done(): - return nil - case <-ticker.C: - full := u.Repository.Index().(*repository.MasterIndex).FullIndexes() - for _, idx := range full { - if u.Start != nil { - u.Start() - } - - id, err := repository.SaveIndex(ctx, u.Repository, idx) - if err != nil { - debug.Log("save indexes returned an error: %v", err) - return err - } - if u.Complete != nil { - u.Complete(id) - } - } - } - } -} diff --git a/internal/repository/index.go b/internal/repository/index.go index 32d00882a..dabac0646 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -94,8 +94,7 @@ var IndexFull = func(idx *Index) bool { return false } -// Store remembers the id and pack in the index. An existing entry will be -// silently overwritten. +// Store remembers the id and pack in the index. func (idx *Index) Store(blob restic.PackedBlob) { idx.m.Lock() defer idx.m.Unlock() @@ -109,6 +108,23 @@ func (idx *Index) Store(blob restic.PackedBlob) { idx.store(blob) } +// StorePack remembers the ids of all blobs of a given pack +// in the index +func (idx *Index) StorePack(id restic.ID, blobs []restic.Blob) { + idx.m.Lock() + defer idx.m.Unlock() + + if idx.final { + panic("store new item in finalized index") + } + + debug.Log("%v", blobs) + + for _, blob := range blobs { + idx.store(restic.PackedBlob{Blob: blob, PackID: id}) + } +} + // Lookup queries the index for the blob ID and returns a restic.PackedBlob. func (idx *Index) Lookup(id restic.ID, tpe restic.BlobType) (blobs []restic.PackedBlob, found bool) { idx.m.Lock() @@ -360,15 +376,13 @@ func (idx *Index) encode(w io.Writer) error { return enc.Encode(idxJSON) } -// Finalize sets the index to final and writes the JSON serialization to w. -func (idx *Index) Finalize(w io.Writer) error { - debug.Log("encoding index") +// Finalize sets the index to final. +func (idx *Index) Finalize() { + debug.Log("finalizing index") idx.m.Lock() defer idx.m.Unlock() idx.final = true - - return idx.encode(w) } // ID returns the ID of the index, if available. If the index is not yet diff --git a/internal/repository/index_test.go b/internal/repository/index_test.go index d64e8585a..e1f2829bd 100644 --- a/internal/repository/index_test.go +++ b/internal/repository/index_test.go @@ -123,9 +123,10 @@ func TestIndexSerialize(t *testing.T) { } } - // serialize idx, unserialize to idx3 + // finalize; serialize idx, unserialize to idx3 + idx.Finalize() wr3 := bytes.NewBuffer(nil) - err = idx.Finalize(wr3) + err = idx.Encode(wr3) rtest.OK(t, err) rtest.Assert(t, idx.Final(), diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 163884e4e..1caa42957 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -11,13 +11,14 @@ import ( // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. type MasterIndex struct { - idx []*Index - idxMutex sync.RWMutex + idx []*Index + pendingBlobs restic.BlobSet + idxMutex sync.RWMutex } // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { - return &MasterIndex{} + return &MasterIndex{pendingBlobs: restic.NewBlobSet()} } // Lookup queries all known Indexes for the ID and returns the first match. @@ -65,11 +66,42 @@ func (mi *MasterIndex) ListPack(id restic.ID) (list []restic.PackedBlob) { return nil } +// AddPending adds a given blob to list of pending Blobs +// Before doing so it checks if this blob is already known. +// Returns true if adding was successful and false if the blob +// was already known +func (mi *MasterIndex) addPending(id restic.ID, tpe restic.BlobType) bool { + + mi.idxMutex.Lock() + defer mi.idxMutex.Unlock() + + // Check if blob is pending or in index + if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) { + return false + } + + for _, idx := range mi.idx { + if idx.Has(id, tpe) { + return false + } + } + + // really not known -> insert + mi.pendingBlobs.Insert(restic.BlobHandle{ID: id, Type: tpe}) + return true +} + // Has queries all known Indexes for the ID and returns the first match. +// Also returns true if the ID is pending. func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool { mi.idxMutex.RLock() defer mi.idxMutex.RUnlock() + // also return true if blob is pending + if mi.pendingBlobs.Has(restic.BlobHandle{ID: id, Type: tpe}) { + return true + } + for _, idx := range mi.idx { if idx.Has(id, tpe) { return true @@ -114,24 +146,30 @@ func (mi *MasterIndex) Remove(index *Index) { } // Store remembers the id and pack in the index. -func (mi *MasterIndex) Store(pb restic.PackedBlob) { +func (mi *MasterIndex) StorePack(id restic.ID, blobs []restic.Blob) { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() + // delete blobs from pending + for _, blob := range blobs { + mi.pendingBlobs.Delete(restic.BlobHandle{Type: blob.Type, ID: blob.ID}) + } + for _, idx := range mi.idx { if !idx.Final() { - idx.Store(pb) + idx.StorePack(id, blobs) return } } newIdx := NewIndex() - newIdx.Store(pb) + newIdx.StorePack(id, blobs) mi.idx = append(mi.idx, newIdx) } -// NotFinalIndexes returns all indexes that have not yet been saved. -func (mi *MasterIndex) NotFinalIndexes() []*Index { +// FinalizeNotFinalIndexes finalizes all indexes that +// have not yet been saved and returns that list +func (mi *MasterIndex) FinalizeNotFinalIndexes() []*Index { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() @@ -139,6 +177,7 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index { for _, idx := range mi.idx { if !idx.Final() { + idx.Finalize() list = append(list, idx) } } @@ -147,8 +186,8 @@ func (mi *MasterIndex) NotFinalIndexes() []*Index { return list } -// FullIndexes returns all indexes that are full. -func (mi *MasterIndex) FullIndexes() []*Index { +// FinalizeFullIndexes finalizes all indexes that are full and returns that list. +func (mi *MasterIndex) FinalizeFullIndexes() []*Index { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() @@ -163,6 +202,7 @@ func (mi *MasterIndex) FullIndexes() []*Index { if IndexFull(idx) { debug.Log("index %p is full", idx) + idx.Finalize() list = append(list, idx) } else { debug.Log("index %p not full", idx) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 4884e0885..e1382c0be 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -136,20 +136,11 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } // update blobs in the index - for _, b := range p.Packer.Blobs() { - debug.Log(" updating blob %v to pack %v", b.ID, id) - r.idx.Store(restic.PackedBlob{ - Blob: restic.Blob{ - Type: b.Type, - ID: b.ID, - Offset: b.Offset, - Length: uint(b.Length), - }, - PackID: id, - }) - } + debug.Log(" updating blobs %v to pack %v", p.Packer.Blobs(), id) + r.idx.StorePack(id, p.Packer.Blobs()) - return nil + // Save index if full + return r.SaveFullIndex(ctx) } // countPacker returns the number of open (unfinished) packers. diff --git a/internal/repository/repack.go b/internal/repository/repack.go index d0119c204..e39480fb1 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -84,7 +84,8 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee h, tempfile.Name(), id) } - _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID) + // We do want to save already saved blobs! + _, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true) if err != nil { return nil, err } diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 84955ec30..0505c7211 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -31,18 +31,17 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl buf := make([]byte, length) rand.Read(buf) - id := restic.Hash(buf) - if repo.Index().Has(id, restic.DataBlob) { - t.Errorf("duplicate blob %v/%v ignored", id, restic.DataBlob) - continue - } - - _, err := repo.SaveBlob(context.TODO(), tpe, buf, id) + id, exists, err := repo.SaveBlob(context.TODO(), tpe, buf, restic.ID{}, false) if err != nil { t.Fatalf("SaveFrom() error %v", err) } + if exists { + t.Errorf("duplicate blob %v/%v ignored", id, restic.DataBlob) + continue + } + if rand.Float32() < 0.2 { if err = repo.Flush(context.Background()); err != nil { t.Fatalf("repo.Flush() returned error %v", err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 08241f683..eba8a1362 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -221,13 +221,8 @@ func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bo // SaveAndEncrypt encrypts data and stores it to the backend as type t. If data // is small enough, it will be packed together with other small blobs. -func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data []byte, id *restic.ID) (restic.ID, error) { - if id == nil { - // compute plaintext hash - hashedID := restic.Hash(data) - id = &hashedID - } - +// The caller must ensure that the id matches the data. +func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) error { debug.Log("save id %v (%v, %d bytes)", id, t, len(data)) nonce := crypto.NewRandomNonce() @@ -252,24 +247,24 @@ func (r *Repository) SaveAndEncrypt(ctx context.Context, t restic.BlobType, data packer, err := pm.findPacker() if err != nil { - return restic.ID{}, err + return err } // save ciphertext - _, err = packer.Add(t, *id, ciphertext) + _, err = packer.Add(t, id, ciphertext) if err != nil { - return restic.ID{}, err + return err } // if the pack is not full enough, put back to the list if packer.Size() < minPackSize { debug.Log("pack is not full enough (%d bytes)", packer.Size()) pm.insertPacker(packer) - return *id, nil + return nil } // else write the pack to the backend - return *id, r.savePacker(ctx, t, packer) + return r.savePacker(ctx, t, packer) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the @@ -307,8 +302,18 @@ func (r *Repository) SaveUnpacked(ctx context.Context, t restic.FileType, p []by return id, nil } -// Flush saves all remaining packs. +// Flush saves all remaining packs and the index func (r *Repository) Flush(ctx context.Context) error { + if err := r.FlushPacks(ctx); err != nil { + return err + } + + // Save index after flushing + return r.SaveIndex(ctx) +} + +// FlushPacks saves all remaining packs. +func (r *Repository) FlushPacks(ctx context.Context) error { pms := []struct { t restic.BlobType pm *packerManager @@ -331,7 +336,6 @@ func (r *Repository) Flush(ctx context.Context) error { p.pm.packers = p.pm.packers[:0] p.pm.pm.Unlock() } - return nil } @@ -366,7 +370,7 @@ func (r *Repository) SetIndex(i restic.Index) error { func SaveIndex(ctx context.Context, repo restic.Repository, index *Index) (restic.ID, error) { buf := bytes.NewBuffer(nil) - err := index.Finalize(buf) + err := index.Encode(buf) if err != nil { return restic.ID{}, err } @@ -392,12 +396,12 @@ func (r *Repository) saveIndex(ctx context.Context, indexes ...*Index) error { // SaveIndex saves all new indexes in the backend. func (r *Repository) SaveIndex(ctx context.Context) error { - return r.saveIndex(ctx, r.idx.NotFinalIndexes()...) + return r.saveIndex(ctx, r.idx.FinalizeNotFinalIndexes()...) } // SaveFullIndex saves all full indexes in the backend. func (r *Repository) SaveFullIndex(ctx context.Context) error { - return r.saveIndex(ctx, r.idx.FullIndexes()...) + return r.saveIndex(ctx, r.idx.FinalizeFullIndexes()...) } const loadIndexParallelism = 4 @@ -670,14 +674,29 @@ func (r *Repository) Close() error { return r.be.Close() } -// SaveBlob saves a blob of type t into the repository. If id is the null id, it -// will be computed and returned. -func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { - var i *restic.ID - if !id.IsNull() { - i = &id +// SaveBlob saves a blob of type t into the repository. +// It takes care that no duplicates are saved; this can be overwritten +// by setting storeDuplicate to true. +// If id is the null id, it will be computed and returned. +// Also returns if the blob was already known before +func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (newID restic.ID, known bool, err error) { + + // compute plaintext hash if not already set + if id.IsNull() { + newID = restic.Hash(buf) + } else { + newID = id } - return r.SaveAndEncrypt(ctx, t, buf, i) + + // first try to add to pending blobs; if not successful, this blob is already known + known = !r.idx.addPending(newID, t) + + // only save when needed or explicitely told + if !known || storeDuplicate { + err = r.SaveAndEncrypt(ctx, t, buf, newID) + } + + return newID, known, err } // LoadTree loads a tree from the repository. @@ -711,12 +730,7 @@ func (r *Repository) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, e // adds a newline after each object) buf = append(buf, '\n') - id := restic.Hash(buf) - if r.idx.Has(id, restic.TreeBlob) { - return id, nil - } - - _, err = r.SaveBlob(ctx, restic.TreeBlob, buf, id) + id, _, err := r.SaveBlob(ctx, restic.TreeBlob, buf, restic.ID{}, false) return id, err } diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index b08448ca8..9bc5d5d24 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -34,7 +34,7 @@ func TestSave(t *testing.T) { id := restic.Hash(data) // save - sid, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}) + sid, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) rtest.OK(t, err) rtest.Equals(t, id, sid) @@ -69,7 +69,7 @@ func TestSaveFrom(t *testing.T) { id := restic.Hash(data) // save - id2, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id) + id2, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) rtest.Equals(t, id, id2) @@ -108,7 +108,7 @@ func BenchmarkSaveAndEncrypt(t *testing.B) { for i := 0; i < t.N; i++ { // save - _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, data, id) + _, _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) } } @@ -158,7 +158,7 @@ func TestLoadBlob(t *testing.T) { _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) - id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) + id, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) rtest.OK(t, repo.Flush(context.Background())) @@ -187,7 +187,7 @@ func BenchmarkLoadBlob(b *testing.B) { _, err := io.ReadFull(rnd, buf) rtest.OK(b, err) - id, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) + id, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(b, err) rtest.OK(b, repo.Flush(context.Background())) @@ -322,15 +322,17 @@ func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) - _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}) + _, _, err = repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) } } func TestRepositoryIncrementalIndex(t *testing.T) { - repo, cleanup := repository.TestRepository(t) + r, cleanup := repository.TestRepository(t) defer cleanup() + repo := r.(*repository.Repository) + repository.IndexFull = func(*repository.Index) bool { return true } // add 15 packs @@ -338,7 +340,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { // add 3 packs, write intermediate index for i := 0; i < 3; i++ { saveRandomDataBlobs(t, repo, 5, 1<<15) - rtest.OK(t, repo.Flush(context.Background())) + rtest.OK(t, repo.FlushPacks(context.Background())) } rtest.OK(t, repo.SaveFullIndex(context.TODO())) @@ -347,7 +349,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { // add another 5 packs for i := 0; i < 5; i++ { saveRandomDataBlobs(t, repo, 5, 1<<15) - rtest.OK(t, repo.Flush(context.Background())) + rtest.OK(t, repo.FlushPacks(context.Background())) } // save final index diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 9713a6c77..33299b13f 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -46,7 +46,7 @@ type Repository interface { LoadAndDecrypt(ctx context.Context, buf []byte, t FileType, id ID) (data []byte, err error) LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) - SaveBlob(context.Context, BlobType, []byte, ID) (ID, error) + SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, error) LoadTree(context.Context, ID) (*Tree, error) SaveTree(context.Context, *Tree) (ID, error) diff --git a/internal/restic/testing.go b/internal/restic/testing.go index b7a107f09..b3a42fd45 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -22,7 +22,6 @@ func fakeFile(seed, size int64) io.Reader { type fakeFileSystem struct { t testing.TB repo Repository - knownBlobs IDSet duplication float32 buf []byte chunker *chunker.Chunker @@ -55,12 +54,11 @@ func (fs *fakeFileSystem) saveFile(ctx context.Context, rd io.Reader) (blobs IDs id := Hash(chunk.Data) if !fs.blobIsKnown(id, DataBlob) { - _, err := fs.repo.SaveBlob(ctx, DataBlob, chunk.Data, id) + _, _, err := fs.repo.SaveBlob(ctx, DataBlob, chunk.Data, id, true) if err != nil { fs.t.Fatalf("error saving chunk: %v", err) } - fs.knownBlobs.Insert(id) } blobs = append(blobs, id) @@ -92,15 +90,10 @@ func (fs *fakeFileSystem) blobIsKnown(id ID, t BlobType) bool { return false } - if fs.knownBlobs.Has(id) { - return true - } - if fs.repo.Index().Has(id, t) { return true } - fs.knownBlobs.Insert(id) return false } @@ -147,7 +140,7 @@ func (fs *fakeFileSystem) saveTree(ctx context.Context, seed int64, depth int) I return id } - _, err := fs.repo.SaveBlob(ctx, TreeBlob, buf, id) + _, _, err := fs.repo.SaveBlob(ctx, TreeBlob, buf, id, false) if err != nil { fs.t.Fatal(err) } @@ -174,7 +167,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, fs := fakeFileSystem{ t: t, repo: repo, - knownBlobs: NewIDSet(), duplication: duplication, rand: rand.New(rand.NewSource(seed)), } @@ -196,11 +188,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, t.Fatal(err) } - err = repo.SaveIndex(context.TODO()) - if err != nil { - t.Fatal(err) - } - return snapshot } diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index 70136bfe3..0c86135a7 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -38,7 +38,7 @@ func saveFile(t testing.TB, repo restic.Repository, node File) restic.ID { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - id, err := repo.SaveBlob(ctx, restic.DataBlob, []byte(node.Data), restic.ID{}) + id, _, err := repo.SaveBlob(ctx, restic.DataBlob, []byte(node.Data), restic.ID{}, false) if err != nil { t.Fatal(err) } @@ -118,11 +118,6 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res t.Fatal(err) } - err = repo.SaveIndex(ctx) - if err != nil { - t.Fatal(err) - } - sn, err := restic.NewSnapshot([]string{"test"}, nil, "", time.Now()) if err != nil { t.Fatal(err)