From 00139648a01facf085dd94a789ddd9d9f6a69138 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Wed, 11 May 2016 22:28:56 +0200 Subject: [PATCH] Implement Repack() --- src/restic/repository/prune.go | 132 ++++++++++++++++++++++++++++ src/restic/repository/prune_test.go | 93 +++++++------------- 2 files changed, 163 insertions(+), 62 deletions(-) diff --git a/src/restic/repository/prune.go b/src/restic/repository/prune.go index a5fd08989..dd02782b6 100644 --- a/src/restic/repository/prune.go +++ b/src/restic/repository/prune.go @@ -1,8 +1,12 @@ package repository import ( + "fmt" + "os" "restic/backend" "restic/debug" + "restic/pack" + "restic/worker" ) // Repack takes a list of packs together with a list of blobs contained in @@ -10,5 +14,133 @@ import ( // into a new pack. Afterwards, the packs are removed. func Repack(repo *Repository, packs, keepBlobs backend.IDSet) error { debug.Log("Repack", "repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) + + var buf []byte + for packID := range packs { + list, err := repo.ListPack(packID) + if err != nil { + return err + } + + debug.Log("Repack", "processing pack %v, blobs: %v", packID.Str(), list) + + for _, blob := range list { + buf, err = repo.LoadBlob(blob.Type, blob.ID, buf) + if err != nil { + return err + } + debug.Log("Repack", " loaded blob %v", blob.ID.Str()) + + _, err = repo.SaveAndEncrypt(blob.Type, buf, &blob.ID) + if err != nil { + return err + } + + debug.Log("Repack", " saved blob %v", blob.ID.Str()) + } + } + + if err := repo.Flush(); err != nil { + return err + } + + for packID := range packs { + err := repo.Backend().Remove(backend.Data, packID.String()) + if err != nil { + debug.Log("Repack", "error removing pack %v: %v", packID.Str(), err) + return err + } + debug.Log("Repack", "removed pack %v", packID.Str()) + } + + return nil +} + +const rebuildIndexWorkers = 10 + +type loadBlobsResult struct { + packID backend.ID + entries []pack.Blob +} + +// loadBlobsFromAllPacks sends the contents of all packs to ch. +func loadBlobsFromAllPacks(repo *Repository, ch chan<- worker.Job, done <-chan struct{}) { + f := func(job worker.Job, done <-chan struct{}) (interface{}, error) { + packID := job.Data.(backend.ID) + entries, err := repo.ListPack(packID) + return loadBlobsResult{ + packID: packID, + entries: entries, + }, err + } + + jobCh := make(chan worker.Job) + wp := worker.New(rebuildIndexWorkers, f, jobCh, ch) + + go func() { + for id := range repo.List(backend.Data, done) { + jobCh <- worker.Job{Data: id} + } + close(jobCh) + }() + + wp.Wait() +} + +// RebuildIndex lists all packs in the repo, writes a new index and removes all +// old indexes. This operation should only be done with an exclusive lock in +// place. +func RebuildIndex(repo *Repository) error { + debug.Log("RebuildIndex", "start rebuilding index") + + done := make(chan struct{}) + defer close(done) + + ch := make(chan worker.Job) + go loadBlobsFromAllPacks(repo, ch, done) + + idx := NewIndex() + for job := range ch { + id := job.Data.(backend.ID) + + if job.Error != nil { + fmt.Fprintf(os.Stderr, "error for pack %v: %v\n", id, job.Error) + continue + } + + res := job.Result.(loadBlobsResult) + + for _, entry := range res.entries { + pb := PackedBlob{ + ID: entry.ID, + Type: entry.Type, + Length: entry.Length, + Offset: entry.Offset, + PackID: res.packID, + } + idx.Store(pb) + } + } + + oldIndexes := backend.NewIDSet() + for id := range repo.List(backend.Index, done) { + idx.AddToSupersedes(id) + oldIndexes.Insert(id) + } + + id, err := SaveIndex(repo, idx) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err) + return err + } + debug.Log("RebuildIndex.RebuildIndex", "new index saved as %v", id.Str()) + + for indexID := range oldIndexes { + err := repo.Backend().Remove(backend.Index, indexID.String()) + if err != nil { + fmt.Fprintf(os.Stderr, "unable to remove index %v: %v\n", indexID.Str(), err) + } + } + return nil } diff --git a/src/restic/repository/prune_test.go b/src/restic/repository/prune_test.go index 227f1c73b..cfcc3aa7a 100644 --- a/src/restic/repository/prune_test.go +++ b/src/restic/repository/prune_test.go @@ -55,55 +55,6 @@ func createRandomBlobs(t *testing.T, repo *Repository, blobs int, pData float32) } } -// redundancy returns the amount of duplicate data in the repo. It only looks -// at all pack files. -func redundancy(t *testing.T, repo *Repository) float32 { - done := make(chan struct{}) - defer close(done) - - type redEntry struct { - count int - size int - } - red := make(map[backend.ID]redEntry) - - for id := range repo.List(backend.Data, done) { - entries, err := repo.ListPack(id) - if err != nil { - t.Fatalf("error listing pack %v: %v", id.Str(), err) - } - - for _, e := range entries { - updatedEntry := redEntry{ - count: 1, - size: int(e.Length), - } - - if oldEntry, ok := red[e.ID]; ok { - updatedEntry.count += oldEntry.count - - if updatedEntry.size != oldEntry.size { - t.Fatalf("sizes do not match: %v != %v", updatedEntry.size, oldEntry.size) - } - } - - red[e.ID] = updatedEntry - } - } - - totalBytes := 0 - redundantBytes := 0 - for _, v := range red { - totalBytes += v.count * v.size - - if v.count > 1 { - redundantBytes += (v.count - 1) * v.size - } - } - - return float32(redundantBytes) / float32(totalBytes) -} - // selectBlobs returns a list of random blobs from the repository with probability p. func selectBlobs(t *testing.T, repo *Repository, p float32) backend.IDSet { done := make(chan struct{}) @@ -155,6 +106,32 @@ func findPacksForBlobs(t *testing.T, repo *Repository, blobs backend.IDSet) back return packs } +func repack(t *testing.T, repo *Repository, packs, blobs backend.IDSet) { + err := Repack(repo, packs, blobs) + if err != nil { + t.Fatal(err) + } +} + +func saveIndex(t *testing.T, repo *Repository) { + if err := repo.SaveIndex(); err != nil { + t.Fatalf("repo.SaveIndex() %v", err) + } +} + +func rebuildIndex(t *testing.T, repo *Repository) { + if err := RebuildIndex(repo); err != nil { + t.Fatalf("error rebuilding index: %v", err) + } +} + +func reloadIndex(t *testing.T, repo *Repository) { + repo.SetIndex(NewMasterIndex()) + if err := repo.LoadIndex(); err != nil { + t.Fatalf("error loading new index: %v", err) + } +} + func TestRepack(t *testing.T) { repo, cleanup := TestRepository(t) defer cleanup() @@ -164,10 +141,7 @@ func TestRepack(t *testing.T) { packsBefore := listPacks(t, repo) // Running repack on empty ID sets should not do anything at all. - err := Repack(repo, nil, nil) - if err != nil { - t.Fatal(err) - } + repack(t, repo, nil, nil) packsAfter := listPacks(t, repo) @@ -176,19 +150,14 @@ func TestRepack(t *testing.T) { packsBefore, packsAfter) } - if err := repo.SaveIndex(); err != nil { - t.Fatalf("repo.SaveIndex() %v", err) - } + saveIndex(t, repo) blobs := selectBlobs(t, repo, 0.2) - t.Logf("selected %d blobs: %v", len(blobs), blobs) - packs := findPacksForBlobs(t, repo, blobs) - err = Repack(repo, packs, blobs) - if err != nil { - t.Fatalf("Repack() error %v", err) - } + repack(t, repo, packs, blobs) + rebuildIndex(t, repo) + reloadIndex(t, repo) packsAfter = listPacks(t, repo) for id := range packs {