diff --git a/internal/repository/index.go b/internal/repository/index.go index c5dcf4f56..30fe3ddb9 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -275,6 +275,55 @@ func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob { return ch } +type EachByPackResult struct { + packID restic.ID + blobs []restic.Blob +} + +// EachByPack returns a channel that yields all blobs known to the index +// grouped by packID but ignoring blobs with a packID in packPlacklist. +// When the context is cancelled, the background goroutine +// terminates. This blocks any modification of the index. +func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { + idx.m.Lock() + + ch := make(chan EachByPackResult) + + go func() { + defer idx.m.Unlock() + defer func() { + close(ch) + }() + + for typ := range idx.byType { + byPack := make(map[restic.ID][]*indexEntry) + m := &idx.byType[typ] + m.foreach(func(e *indexEntry) bool { + packID := idx.packs[e.packIndex] + if !packBlacklist.Has(packID) { + byPack[packID] = append(byPack[packID], e) + } + return true + }) + + for packID, pack := range byPack { + var result EachByPackResult + result.packID = packID + for _, e := range pack { + result.blobs = append(result.blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob) + } + select { + case <-ctx.Done(): + return + case ch <- result: + } + } + } + }() + + return ch +} + // Packs returns all packs in this index func (idx *Index) Packs() restic.IDSet { idx.m.Lock() diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 6e84d3b1b..ddc72fd3c 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -97,6 +97,19 @@ func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool { return false } +// Packs returns all packs that are covered by the index. +func (mi *MasterIndex) Packs() restic.IDSet { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + packs := restic.NewIDSet() + for _, idx := range mi.idx { + packs.Merge(idx.Packs()) + } + + return packs +} + // Count returns the number of blobs of type t in the index. func (mi *MasterIndex) Count(t restic.BlobType) (n uint) { mi.idxMutex.RLock() @@ -248,49 +261,66 @@ func (mi *MasterIndex) MergeFinalIndexes() { mi.idx = newIdx } -// RebuildIndex combines all known indexes to a new index, leaving out any +// Save saves all known indexes to index files, leaving out any // packs whose ID is contained in packBlacklist. The new index contains the IDs -// of all known indexes in the "supersedes" field. -func (mi *MasterIndex) RebuildIndex(ctx context.Context, packBlacklist restic.IDSet) (*Index, error) { +// of all known indexes in the "supersedes" field. The IDs are also returned in +// the IDSet obsolete +// After calling this function, you should remove the obsolete index files. +func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *restic.Progress) (obsolete restic.IDSet, err error) { + p.Start() + defer p.Done() + mi.idxMutex.Lock() defer mi.idxMutex.Unlock() debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist) newIndex := NewIndex() + obsolete = restic.NewIDSet() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - for i, idx := range mi.idx { - debug.Log("adding index %d", i) - - for pb := range idx.Each(ctx) { - if packBlacklist.Has(pb.PackID) { - continue - } - - newIndex.Store(pb) - } - - if !idx.Final() { - debug.Log("index %d isn't final, don't add to supersedes field", i) - continue - } - - ids, err := idx.IDs() - if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return nil, err - } - - debug.Log("adding index ids %v to supersedes field", ids) - - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return nil, err + finalize := func() error { + newIndex.Finalize() + if _, err := SaveIndex(ctx, repo, newIndex); err != nil { + return err } + newIndex = NewIndex() + return nil } - return newIndex, nil + for i, idx := range mi.idx { + if idx.Final() { + ids, err := idx.IDs() + if err != nil { + debug.Log("index %d does not have an ID: %v", err) + return nil, err + } + + debug.Log("adding index ids %v to supersedes field", ids) + + err = newIndex.AddToSupersedes(ids...) + if err != nil { + return nil, err + } + obsolete.Merge(restic.NewIDSet(ids...)) + } else { + debug.Log("index %d isn't final, don't add to supersedes field", i) + } + + debug.Log("adding index %d", i) + + for pbs := range idx.EachByPack(ctx, packBlacklist) { + newIndex.StorePack(pbs.packID, pbs.blobs) + p.Report(restic.Stat{Blobs: 1}) + if IndexFull(newIndex) { + if err := finalize(); err != nil { + return nil, err + } + } + } + } + if err := finalize(); err != nil { + return nil, err + } + + return } diff --git a/internal/repository/master_index_test.go b/internal/repository/master_index_test.go index 28eba03a8..aa9905321 100644 --- a/internal/repository/master_index_test.go +++ b/internal/repository/master_index_test.go @@ -5,7 +5,9 @@ import ( "fmt" "math/rand" "testing" + "time" + "github.com/restic/restic/internal/checker" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" @@ -322,3 +324,65 @@ func BenchmarkMasterIndexLookupBlobSize(b *testing.B) { mIdx.LookupSize(lookupID, restic.DataBlob) } } + +var ( + snapshotTime = time.Unix(1470492820, 207401672) + depth = 3 +) + +func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) { + repo, cleanup := repository.TestRepository(t) + + for i := 0; i < 3; i++ { + restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup) + } + + return repo, cleanup +} + +func TestIndexSave(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3, 0) + defer cleanup() + + repo.LoadIndex(context.TODO()) + + obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil) + if err != nil { + t.Fatalf("unable to save new index: %v", err) + } + + for id := range obsoletes { + t.Logf("remove index %v", id.Str()) + h := restic.Handle{Type: restic.IndexFile, Name: id.String()} + err = repo.Backend().Remove(context.TODO(), h) + if err != nil { + t.Errorf("error removing index %v: %v", id, err) + } + } + + checker := checker.New(repo) + hints, errs := checker.LoadIndex(context.TODO()) + for _, h := range hints { + t.Logf("hint: %v\n", h) + } + + for _, err := range errs { + t.Errorf("checker found error: %v", err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + errCh := make(chan error) + go checker.Structure(ctx, errCh) + i := 0 + for err := range errCh { + t.Errorf("checker returned error: %v", err) + i++ + if i == 10 { + t.Errorf("more than 10 errors returned, skipping the rest") + cancel() + break + } + } +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index dcda37f04..5efdfbc03 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -62,6 +62,7 @@ type MasterIndex interface { Has(ID, BlobType) bool Lookup(ID, BlobType) []PackedBlob Count(BlobType) uint + Packs() IDSet // Each returns a channel that yields all blobs known to the index. When // the context is cancelled, the background goroutine terminates. This