diff --git a/changelog/unreleased/pull-3006 b/changelog/unreleased/pull-3006 new file mode 100644 index 000000000..71f110101 --- /dev/null +++ b/changelog/unreleased/pull-3006 @@ -0,0 +1,9 @@ +Enhancement: Speed up rebuild-index + +We've optimized the command rebuild-index. Now, existing index entries are used +to minimize the number of pack files that must be read. This speeds up the index +rebuild a lot. +Also the option --read-all-packs was added which implements the previous behavior. + +https://github.com/restic/restic/issue/2547 +https://github.com/restic/restic/pull/3006 diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index d4073107c..f85a9a672 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -498,7 +498,7 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB if len(removePacks) != 0 { totalpacks := int(stats.packs.used+stats.packs.partlyUsed+stats.packs.unused) - len(removePacks) + packsAddedByRepack - err = rebuildIndexFiles(gopts, repo, removePacks, uint64(totalpacks)) + err = rebuildIndexFiles(gopts, repo, removePacks, nil, uint64(totalpacks)) if err != nil { return err } @@ -511,12 +511,12 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB return nil } -func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, packcount uint64) error { +func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, packcount uint64) error { Verbosef("rebuilding index\n") bar := newProgressMax(!gopts.Quiet, packcount, "packs processed") obsoleteIndexes, err := (repo.Index()).(*repository.MasterIndex). - Save(gopts.ctx, repo, removePacks, bar) + Save(gopts.ctx, repo, removePacks, extraObsolete, bar) bar.Done() if err != nil { return err diff --git a/cmd/restic/cmd_rebuild_index.go b/cmd/restic/cmd_rebuild_index.go index edae215a6..850c03dd2 100644 --- a/cmd/restic/cmd_rebuild_index.go +++ b/cmd/restic/cmd_rebuild_index.go @@ -1,10 +1,8 @@ package main import ( - "context" - - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/index" + "github.com/restic/restic/internal/pack" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/spf13/cobra" @@ -12,7 +10,7 @@ import ( var cmdRebuildIndex = &cobra.Command{ Use: "rebuild-index [flags]", - Short: "Build a new index file", + Short: "Build a new index", Long: ` The "rebuild-index" command creates a new index based on the pack files in the repository. @@ -24,15 +22,25 @@ Exit status is 0 if the command was successful, and non-zero if there was any er `, DisableAutoGenTag: true, RunE: func(cmd *cobra.Command, args []string) error { - return runRebuildIndex(globalOptions) + return runRebuildIndex(rebuildIndexOptions, globalOptions) }, } -func init() { - cmdRoot.AddCommand(cmdRebuildIndex) +// RebuildIndexOptions collects all options for the rebuild-index command. +type RebuildIndexOptions struct { + ReadAllPacks bool } -func runRebuildIndex(gopts GlobalOptions) error { +var rebuildIndexOptions RebuildIndexOptions + +func init() { + cmdRoot.AddCommand(cmdRebuildIndex) + f := cmdRebuildIndex.Flags() + f.BoolVar(&rebuildIndexOptions.ReadAllPacks, "read-all-packs", false, "read all pack files to generate new index from scratch") + +} + +func runRebuildIndex(opts RebuildIndexOptions, gopts GlobalOptions) error { repo, err := OpenRepository(gopts) if err != nil { return err @@ -44,59 +52,97 @@ func runRebuildIndex(gopts GlobalOptions) error { return err } - ctx, cancel := context.WithCancel(gopts.ctx) - defer cancel() - return rebuildIndex(ctx, repo, restic.NewIDSet()) + return rebuildIndex(opts, gopts, repo, restic.NewIDSet()) } -func rebuildIndex(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet) error { - Verbosef("counting files in repo\n") +func rebuildIndex(opts RebuildIndexOptions, gopts GlobalOptions, repo *repository.Repository, ignorePacks restic.IDSet) error { + ctx := gopts.ctx - var packs uint64 - err := repo.List(ctx, restic.PackFile, func(restic.ID, int64) error { - packs++ - return nil - }) - if err != nil { - return err - } + var obsoleteIndexes restic.IDs + packSizeFromList := make(map[restic.ID]int64) + removePacks := restic.NewIDSet() + totalPacks := 0 - bar := newProgressMax(!globalOptions.Quiet, packs-uint64(len(ignorePacks)), "packs") - idx, invalidFiles, err := index.New(ctx, repo, ignorePacks, bar) - bar.Done() - if err != nil { - return err - } + if opts.ReadAllPacks { + // get old index files + err := repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + obsoleteIndexes = append(obsoleteIndexes, id) + return nil + }) + if err != nil { + return err + } - if globalOptions.verbosity >= 2 { - for _, id := range invalidFiles { - Printf("skipped incomplete pack file: %v\n", id) + Verbosef("finding pack files in repo...\n") + err = repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { + packSizeFromList[id] = size + removePacks.Insert(id) + totalPacks++ + return nil + }) + if err != nil { + return err + } + } else { + Verbosef("loading indexes...\n") + err := repo.LoadIndex(gopts.ctx) + if err != nil { + return err + } + + Verbosef("getting pack files to read...\n") + + // Compute size of each pack from index entries + packSizeFromIndex := make(map[restic.ID]int64) + for blob := range repo.Index().Each(ctx) { + size, ok := packSizeFromIndex[blob.PackID] + if !ok { + size = pack.HeaderSize + } + // update packSizeFromIndex + packSizeFromIndex[blob.PackID] = size + int64(pack.PackedSizeOfBlob(blob.Length)) + } + + err = repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { + size, ok := packSizeFromIndex[id] + if !ok || size != packSize { + // Pack was not referenced in index or size does not match + packSizeFromList[id] = size + removePacks.Insert(id) + } + totalPacks++ + delete(packSizeFromIndex, id) + return nil + }) + if err != nil { + return err + } + for id := range packSizeFromIndex { + // forget pack files that are referenced in the index but do not exist + // when rebuilding the index + removePacks.Insert(id) } } - Verbosef("finding old index files\n") + if len(packSizeFromList) > 0 { + Verbosef("reading pack files\n") + bar := newProgressMax(!globalOptions.Quiet, uint64(len(packSizeFromList)), "packs") + invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar) + if err != nil { + return err + } - var supersedes restic.IDs - err = repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { - supersedes = append(supersedes, id) - return nil - }) + for _, id := range invalidFiles { + Verboseff("skipped incomplete pack file: %v\n", id) + totalPacks-- + } + } + + err := rebuildIndexFiles(gopts, repo, removePacks, obsoleteIndexes, uint64(totalPacks)) if err != nil { return err } - - ids, err := idx.Save(ctx, repo, supersedes) - if err != nil { - return errors.Fatalf("unable to save index, last error was: %v", err) - } - - Verbosef("saved new indexes as %v\n", ids) - - Verbosef("remove %d old index files\n", len(supersedes)) - err = DeleteFilesChecked(globalOptions, repo, restic.NewIDSet(supersedes...), restic.IndexFile) - if err != nil { - return errors.Fatalf("unable to remove an old index: %v\n", err) - } + Verbosef("done\n") return nil } diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 42fe34886..be8ec2bd9 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -175,7 +175,7 @@ func testRunRebuildIndex(t testing.TB, gopts GlobalOptions) { globalOptions.stdout = os.Stdout }() - rtest.OK(t, runRebuildIndex(gopts)) + rtest.OK(t, runRebuildIndex(RebuildIndexOptions{}, gopts)) } func testRunLs(t testing.TB, gopts GlobalOptions, snapshotID string) []string { @@ -1351,7 +1351,7 @@ func TestRebuildIndexFailsOnAppendOnly(t *testing.T) { env.gopts.backendTestHook = func(r restic.Backend) (restic.Backend, error) { return &appendOnlyBackend{r}, nil } - err := runRebuildIndex(env.gopts) + err := runRebuildIndex(RebuildIndexOptions{}, env.gopts) if err == nil { t.Error("expected rebuildIndex to fail") } @@ -1583,7 +1583,7 @@ func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func( return be.Backend.List(ctx, t, fn) } -func TestPruneListOnce(t *testing.T) { +func TestListOnce(t *testing.T) { env, cleanup := withTestEnvironment(t) defer cleanup() @@ -1613,6 +1613,9 @@ func TestPruneListOnce(t *testing.T) { testRunForget(t, env.gopts, firstSnapshot[0].String()) testRunPrune(t, env.gopts, pruneOpts) rtest.OK(t, runCheck(checkOpts, env.gopts, nil)) + + rtest.OK(t, runRebuildIndex(RebuildIndexOptions{}, env.gopts)) + rtest.OK(t, runRebuildIndex(RebuildIndexOptions{ReadAllPacks: true}, env.gopts)) } func TestHardLink(t *testing.T) { diff --git a/internal/index/index.go b/internal/index/index.go deleted file mode 100644 index 4c0f5fee7..000000000 --- a/internal/index/index.go +++ /dev/null @@ -1,373 +0,0 @@ -// Package index contains various data structures for indexing content in a repository or backend. -package index - -import ( - "context" - "fmt" - "os" - "sync" - - "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/pack" - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/ui/progress" - - "golang.org/x/sync/errgroup" -) - -// Pack contains information about the contents of a pack. -type Pack struct { - ID restic.ID - Size int64 - Entries []restic.Blob -} - -// Index contains information about blobs and packs stored in a repo. -type Index struct { - Packs map[restic.ID]Pack - IndexIDs restic.IDSet -} - -func newIndex() *Index { - return &Index{ - Packs: make(map[restic.ID]Pack), - IndexIDs: restic.NewIDSet(), - } -} - -const listPackWorkers = 10 - -// Lister lists files and their contents -type Lister interface { - // List runs fn for all files of type t in the repo. - List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error - - // ListPack returns the list of blobs saved in the pack id and the length - // of the file as stored in the backend. - ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) -} - -// New creates a new index for repo from scratch. InvalidFiles contains all IDs -// of files that cannot be listed successfully. -func New(ctx context.Context, repo Lister, ignorePacks restic.IDSet, p *progress.Counter) (idx *Index, invalidFiles restic.IDs, err error) { - type Job struct { - PackID restic.ID - Size int64 - } - - type Result struct { - Error error - PackID restic.ID - Size int64 - Entries []restic.Blob - } - - inputCh := make(chan Job) - outputCh := make(chan Result) - wg, ctx := errgroup.WithContext(ctx) - - // list the files in the repo, send to inputCh - wg.Go(func() error { - defer close(inputCh) - return repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { - if ignorePacks.Has(id) { - return nil - } - - job := Job{ - PackID: id, - Size: size, - } - - select { - case inputCh <- job: - case <-ctx.Done(): - } - return nil - }) - }) - - // run the workers listing the files, read from inputCh, send to outputCh - var workers sync.WaitGroup - for i := 0; i < listPackWorkers; i++ { - workers.Add(1) - go func() { - defer workers.Done() - for job := range inputCh { - res := Result{PackID: job.PackID} - res.Entries, res.Size, res.Error = repo.ListPack(ctx, job.PackID, job.Size) - - select { - case outputCh <- res: - case <-ctx.Done(): - return - } - } - }() - } - - // wait until all the workers are done, then close outputCh - wg.Go(func() error { - workers.Wait() - close(outputCh) - return nil - }) - - idx = newIndex() - - for res := range outputCh { - p.Add(1) - if res.Error != nil { - cause := errors.Cause(res.Error) - if _, ok := cause.(pack.InvalidFileError); ok { - invalidFiles = append(invalidFiles, res.PackID) - continue - } - - fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", res.PackID, res.Error) - continue - } - - debug.Log("pack %v contains %d blobs", res.PackID, len(res.Entries)) - - err := idx.AddPack(res.PackID, res.Size, res.Entries) - if err != nil { - return nil, nil, err - } - - select { - case <-ctx.Done(): // an error occurred - default: - } - } - - err = wg.Wait() - if err != nil { - return nil, nil, err - } - - return idx, invalidFiles, nil -} - -type packJSON struct { - ID restic.ID `json:"id"` - Blobs []blobJSON `json:"blobs"` -} - -type blobJSON struct { - ID restic.ID `json:"id"` - Type restic.BlobType `json:"type"` - Offset uint `json:"offset"` - Length uint `json:"length"` -} - -type indexJSON struct { - Supersedes restic.IDs `json:"supersedes,omitempty"` - Packs []packJSON `json:"packs"` -} - -// ListLoader allows listing files and their content, in addition to loading and unmarshaling JSON files. -type ListLoader interface { - Lister - LoadJSONUnpacked(context.Context, restic.FileType, restic.ID, interface{}) error -} - -func loadIndexJSON(ctx context.Context, repo ListLoader, id restic.ID) (*indexJSON, error) { - debug.Log("process index %v\n", id) - - var idx indexJSON - err := repo.LoadJSONUnpacked(ctx, restic.IndexFile, id, &idx) - if err != nil { - return nil, err - } - - return &idx, nil -} - -// Load creates an index by loading all index files from the repo. -func Load(ctx context.Context, repo ListLoader, p *progress.Counter) (*Index, error) { - debug.Log("loading indexes") - - supersedes := make(map[restic.ID]restic.IDSet) - results := make(map[restic.ID]map[restic.ID]Pack) - - index := newIndex() - - err := repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { - p.Add(1) - - debug.Log("Load index %v", id) - idx, err := loadIndexJSON(ctx, repo, id) - if err != nil { - return err - } - - res := make(map[restic.ID]Pack) - supersedes[id] = restic.NewIDSet() - for _, sid := range idx.Supersedes { - debug.Log(" index %v supersedes %v", id, sid) - supersedes[id].Insert(sid) - } - - for _, jpack := range idx.Packs { - entries := make([]restic.Blob, 0, len(jpack.Blobs)) - for _, blob := range jpack.Blobs { - entry := restic.Blob{ - ID: blob.ID, - Type: blob.Type, - Offset: blob.Offset, - Length: blob.Length, - } - entries = append(entries, entry) - } - - if err = index.AddPack(jpack.ID, 0, entries); err != nil { - return err - } - } - - results[id] = res - index.IndexIDs.Insert(id) - - return nil - }) - - if err != nil { - return nil, err - } - - for superID, list := range supersedes { - for indexID := range list { - if _, ok := results[indexID]; !ok { - continue - } - debug.Log(" removing index %v, superseded by %v", indexID, superID) - fmt.Fprintf(os.Stderr, "index %v can be removed, superseded by index %v\n", indexID.Str(), superID.Str()) - delete(results, indexID) - } - } - - return index, nil -} - -// AddPack adds a pack to the index. If this pack is already in the index, an -// error is returned. -func (idx *Index) AddPack(id restic.ID, size int64, entries []restic.Blob) error { - if _, ok := idx.Packs[id]; ok { - return errors.Errorf("pack %v already present in the index", id.Str()) - } - - idx.Packs[id] = Pack{ID: id, Size: size, Entries: entries} - - return nil -} - -// RemovePack deletes a pack from the index. -func (idx *Index) RemovePack(id restic.ID) error { - if _, ok := idx.Packs[id]; !ok { - return errors.Errorf("pack %v not found in the index", id.Str()) - } - - delete(idx.Packs, id) - - return nil -} - -// DuplicateBlobs returns a list of blobs that are stored more than once in the -// repo. -func (idx *Index) DuplicateBlobs() (dups restic.BlobSet) { - dups = restic.NewBlobSet() - seen := restic.NewBlobSet() - - for _, p := range idx.Packs { - for _, entry := range p.Entries { - h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} - if seen.Has(h) { - dups.Insert(h) - } - seen.Insert(h) - } - } - - return dups -} - -// PacksForBlobs returns the set of packs in which the blobs are contained. -func (idx *Index) PacksForBlobs(blobs restic.BlobSet) (packs restic.IDSet) { - packs = restic.NewIDSet() - - for id, p := range idx.Packs { - for _, entry := range p.Entries { - if blobs.Has(restic.BlobHandle{ID: entry.ID, Type: entry.Type}) { - packs.Insert(id) - } - } - } - - return packs -} - -const maxEntries = 3000 - -// Saver saves structures as JSON. -type Saver interface { - SaveJSONUnpacked(ctx context.Context, t restic.FileType, item interface{}) (restic.ID, error) -} - -// Save writes the complete index to the repo. -func (idx *Index) Save(ctx context.Context, repo Saver, supersedes restic.IDs) (restic.IDs, error) { - debug.Log("pack files: %d\n", len(idx.Packs)) - - var indexIDs []restic.ID - - packs := 0 - jsonIDX := &indexJSON{ - Supersedes: supersedes, - Packs: make([]packJSON, 0, maxEntries), - } - - for packID, pack := range idx.Packs { - debug.Log("%04d add pack %v with %d entries", packs, packID, len(pack.Entries)) - b := make([]blobJSON, 0, len(pack.Entries)) - for _, blob := range pack.Entries { - b = append(b, blobJSON{ - ID: blob.ID, - Type: blob.Type, - Offset: blob.Offset, - Length: blob.Length, - }) - } - - p := packJSON{ - ID: packID, - Blobs: b, - } - - jsonIDX.Packs = append(jsonIDX.Packs, p) - - packs++ - if packs == maxEntries { - id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX) - if err != nil { - return nil, err - } - debug.Log("saved new index as %v", id) - - indexIDs = append(indexIDs, id) - packs = 0 - jsonIDX.Packs = jsonIDX.Packs[:0] - } - } - - if packs > 0 { - id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX) - if err != nil { - return nil, err - } - debug.Log("saved new index as %v", id) - indexIDs = append(indexIDs, id) - } - - return indexIDs, nil -} diff --git a/internal/index/index_test.go b/internal/index/index_test.go deleted file mode 100644 index f4ff53f7a..000000000 --- a/internal/index/index_test.go +++ /dev/null @@ -1,497 +0,0 @@ -package index - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/restic/restic/internal/checker" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/repository" - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/test" -) - -var ( - snapshotTime = time.Unix(1470492820, 207401672) - depth = 3 -) - -func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) { - repo, cleanup := repository.TestRepository(t) - - for i := 0; i < 3; i++ { - restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup) - } - - return repo, cleanup -} - -func validateIndex(t testing.TB, repo restic.Repository, idx *Index) { - err := repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { - p, ok := idx.Packs[id] - if !ok { - t.Errorf("pack %v missing from index", id.Str()) - } - - if !p.ID.Equal(id) { - t.Errorf("pack %v has invalid ID: want %v, got %v", id.Str(), id, p.ID) - } - return nil - }) - - if err != nil { - t.Fatal(err) - } -} - -func TestIndexNew(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - idx, invalid, err := New(context.TODO(), repo, restic.NewIDSet(), nil) - if err != nil { - t.Fatalf("New() returned error %v", err) - } - - if idx == nil { - t.Fatalf("New() returned nil index") - } - - if len(invalid) > 0 { - t.Fatalf("New() returned invalid files: %v", invalid) - } - - validateIndex(t, repo, idx) -} - -type ErrorRepo struct { - restic.Repository - MaxListFiles int - - MaxPacks int - MaxPacksMutex sync.Mutex -} - -// List returns an error after repo.MaxListFiles files. -func (repo *ErrorRepo) List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error { - if repo.MaxListFiles == 0 { - return errors.New("test error, max is zero") - } - - max := repo.MaxListFiles - return repo.Repository.List(ctx, t, func(id restic.ID, size int64) error { - if max == 0 { - return errors.New("test error, max reached zero") - } - - max-- - return fn(id, size) - }) -} - -// ListPack returns an error after repo.MaxPacks files. -func (repo *ErrorRepo) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) { - repo.MaxPacksMutex.Lock() - max := repo.MaxPacks - if max > 0 { - repo.MaxPacks-- - } - repo.MaxPacksMutex.Unlock() - - if max == 0 { - return nil, 0, errors.New("test list pack error") - } - - return repo.Repository.ListPack(ctx, id, size) -} - -func TestIndexNewListErrors(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - for _, max := range []int{0, 3, 5} { - errRepo := &ErrorRepo{ - Repository: repo, - MaxListFiles: max, - } - idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) - if err == nil { - t.Errorf("expected error not found, got nil") - } - - if idx != nil { - t.Errorf("expected nil index, got %v", idx) - } - - if len(invalid) != 0 { - t.Errorf("expected empty invalid list, got %v", invalid) - } - } -} - -func TestIndexNewPackErrors(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - for _, max := range []int{0, 3, 5} { - errRepo := &ErrorRepo{ - Repository: repo, - MaxPacks: max, - } - idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) - if err == nil { - t.Errorf("expected error not found, got nil") - } - - if idx != nil { - t.Errorf("expected nil index, got %v", idx) - } - - if len(invalid) != 0 { - t.Errorf("expected empty invalid list, got %v", invalid) - } - } -} - -func TestIndexLoad(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - loadIdx, err := Load(context.TODO(), repo, nil) - if err != nil { - t.Fatalf("Load() returned error %v", err) - } - - if loadIdx == nil { - t.Fatalf("Load() returned nil index") - } - - validateIndex(t, repo, loadIdx) - - newIdx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) - if err != nil { - t.Fatalf("New() returned error %v", err) - } - - if len(loadIdx.Packs) != len(newIdx.Packs) { - t.Errorf("number of packs does not match: want %v, got %v", - len(loadIdx.Packs), len(newIdx.Packs)) - } - - validateIndex(t, repo, newIdx) - - for packID, packNew := range newIdx.Packs { - packLoad, ok := loadIdx.Packs[packID] - - if !ok { - t.Errorf("loaded index does not list pack %v", packID.Str()) - continue - } - - if len(packNew.Entries) != len(packLoad.Entries) { - t.Errorf(" number of entries in pack %v does not match: %d != %d\n %v\n %v", - packID.Str(), len(packNew.Entries), len(packLoad.Entries), - packNew.Entries, packLoad.Entries) - continue - } - - for _, entryNew := range packNew.Entries { - found := false - for _, entryLoad := range packLoad.Entries { - if !entryLoad.ID.Equal(entryNew.ID) { - continue - } - - if entryLoad.Type != entryNew.Type { - continue - } - - if entryLoad.Offset != entryNew.Offset { - continue - } - - if entryLoad.Length != entryNew.Length { - continue - } - - found = true - break - } - - if !found { - t.Errorf("blob not found in loaded index: %v", entryNew) - } - } - } -} - -func BenchmarkIndexNew(b *testing.B) { - repo, cleanup := createFilledRepo(b, 3, 0) - defer cleanup() - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) - - if err != nil { - b.Fatalf("New() returned error %v", err) - } - - if idx == nil { - b.Fatalf("New() returned nil index") - } - b.Logf("idx %v packs", len(idx.Packs)) - } -} - -func BenchmarkIndexSave(b *testing.B) { - repo, cleanup := repository.TestRepository(b) - defer cleanup() - - idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) - test.OK(b, err) - - for i := 0; i < 8000; i++ { - entries := make([]restic.Blob, 0, 200) - for j := 0; j < cap(entries); j++ { - entries = append(entries, restic.Blob{ - ID: restic.NewRandomID(), - Length: 1000, - Offset: 5, - Type: restic.DataBlob, - }) - } - - idx.AddPack(restic.NewRandomID(), 10000, entries) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - ids, err := idx.Save(context.TODO(), repo, nil) - if err != nil { - b.Fatalf("New() returned error %v", err) - } - - b.Logf("saved as %v", ids) - } -} - -func TestIndexDuplicateBlobs(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0.05) - defer cleanup() - - idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) - if err != nil { - t.Fatal(err) - } - - dups := idx.DuplicateBlobs() - if len(dups) == 0 { - t.Errorf("no duplicate blobs found") - } - t.Logf("%d packs, %d duplicate blobs", len(idx.Packs), len(dups)) - - packs := idx.PacksForBlobs(dups) - if len(packs) == 0 { - t.Errorf("no packs with duplicate blobs found") - } - t.Logf("%d packs with duplicate blobs", len(packs)) -} - -func loadIndex(t testing.TB, repo restic.Repository) *Index { - idx, err := Load(context.TODO(), repo, nil) - if err != nil { - t.Fatalf("Load() returned error %v", err) - } - - return idx -} - -func TestIndexSave(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - idx := loadIndex(t, repo) - - ids, err := idx.Save(context.TODO(), repo, idx.IndexIDs.List()) - if err != nil { - t.Fatalf("unable to save new index: %v", err) - } - - t.Logf("new index saved as %v", ids) - - for id := range idx.IndexIDs { - t.Logf("remove index %v", id.Str()) - h := restic.Handle{Type: restic.IndexFile, Name: id.String()} - err = repo.Backend().Remove(context.TODO(), h) - if err != nil { - t.Errorf("error removing index %v: %v", id, err) - } - } - - idx2 := loadIndex(t, repo) - t.Logf("load new index with %d packs", len(idx2.Packs)) - - checker := checker.New(repo) - hints, errs := checker.LoadIndex(context.TODO()) - for _, h := range hints { - t.Logf("hint: %v\n", h) - } - - for _, err := range errs { - t.Errorf("checker found error: %v", err) - } - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - errCh := make(chan error) - go checker.Structure(ctx, errCh) - i := 0 - for err := range errCh { - t.Errorf("checker returned error: %v", err) - i++ - if i == 10 { - t.Errorf("more than 10 errors returned, skipping the rest") - cancel() - break - } - } -} - -// Location describes the location of a blob in a pack. -type location struct { - PackID restic.ID - restic.Blob -} - -// FindBlob returns a list of packs and positions the blob can be found in. -func (idx *Index) findBlob(h restic.BlobHandle) (result []location) { - for id, p := range idx.Packs { - for _, entry := range p.Entries { - if entry.ID.Equal(h.ID) && entry.Type == h.Type { - result = append(result, location{ - PackID: id, - Blob: entry, - }) - } - } - } - - return result -} - -func TestIndexAddRemovePack(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0) - defer cleanup() - - idx, err := Load(context.TODO(), repo, nil) - if err != nil { - t.Fatalf("Load() returned error %v", err) - } - - var packID restic.ID - err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { - packID = id - return nil - }) - if err != nil { - t.Fatal(err) - } - - t.Logf("selected pack %v", packID.Str()) - - blobs := idx.Packs[packID].Entries - - idx.RemovePack(packID) - - if _, ok := idx.Packs[packID]; ok { - t.Errorf("removed pack %v found in index.Packs", packID.Str()) - } - - for _, blob := range blobs { - h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} - locs := idx.findBlob(h) - if len(locs) != 0 { - t.Errorf("removed blob %v found in index", h) - } - } -} - -// example index serialization from doc/Design.rst -var docExample = []byte(` -{ - "supersedes": [ - "ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452" - ], - "packs": [ - { - "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", - "blobs": [ - { - "id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", - "type": "data", - "offset": 0, - "length": 25 - },{ - "id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", - "type": "tree", - "offset": 38, - "length": 100 - }, - { - "id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66", - "type": "data", - "offset": 150, - "length": 123 - } - ] - } - ] -} -`) - -func TestIndexLoadDocReference(t *testing.T) { - repo, cleanup := repository.TestRepository(t) - defer cleanup() - - id, err := repo.SaveUnpacked(context.TODO(), restic.IndexFile, docExample) - if err != nil { - t.Fatalf("SaveUnpacked() returned error %v", err) - } - - t.Logf("index saved as %v", id.Str()) - - idx := loadIndex(t, repo) - - blobID := restic.TestParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66") - locs := idx.findBlob(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) - if len(locs) == 0 { - t.Error("blob not found in index") - } - - if len(locs) != 1 { - t.Errorf("blob found %d times, expected just one", len(locs)) - } - - l := locs[0] - if !l.ID.Equal(blobID) { - t.Errorf("blob IDs are not equal: %v != %v", l.ID, blobID) - } - - if l.Type != restic.DataBlob { - t.Errorf("want type %v, got %v", restic.DataBlob, l.Type) - } - - if l.Offset != 150 { - t.Errorf("wrong offset, want %d, got %v", 150, l.Offset) - } - - if l.Length != 123 { - t.Errorf("wrong length, want %d, got %v", 123, l.Length) - } -} diff --git a/internal/repository/index.go b/internal/repository/index.go index 30fe3ddb9..997e9b708 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -281,7 +281,10 @@ type EachByPackResult struct { } // EachByPack returns a channel that yields all blobs known to the index -// grouped by packID but ignoring blobs with a packID in packPlacklist. +// grouped by packID but ignoring blobs with a packID in packPlacklist for +// finalized indexes. +// This filtering is used when rebuilding the index where we need to ignore packs +// from the finalized index which have been re-read into a non-finalized index. // When the context is cancelled, the background goroutine // terminates. This blocks any modification of the index. func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { @@ -300,7 +303,7 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <- m := &idx.byType[typ] m.foreach(func(e *indexEntry) bool { packID := idx.packs[e.packIndex] - if !packBlacklist.Has(packID) { + if !idx.final || !packBlacklist.Has(packID) { byPack[packID] = append(byPack[packID], e) } return true diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 3cbcad0a1..041bbc669 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -7,6 +7,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved. @@ -261,12 +262,14 @@ func (mi *MasterIndex) MergeFinalIndexes() { mi.idx = newIdx } +const saveIndexParallelism = 4 + // Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist. The new index contains the IDs -// of all known indexes in the "supersedes" field. The IDs are also returned in -// the IDSet obsolete +// packs whose ID is contained in packBlacklist from finalized indexes. +// The new index contains the IDs of all known indexes in the "supersedes" +// field. The IDs are also returned in the IDSet obsolete. // After calling this function, you should remove the obsolete index files. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *progress.Counter) (obsolete restic.IDSet, err error) { +func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { mi.idxMutex.Lock() defer mi.idxMutex.Unlock() @@ -275,49 +278,79 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla newIndex := NewIndex() obsolete = restic.NewIDSet() - finalize := func() error { - newIndex.Finalize() - if _, err := SaveIndex(ctx, repo, newIndex); err != nil { - return err - } - newIndex = NewIndex() - return nil - } + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) - for i, idx := range mi.idx { - if idx.Final() { - ids, err := idx.IDs() - if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return nil, err + ch := make(chan *Index) + + wg.Go(func() error { + defer close(ch) + for i, idx := range mi.idx { + if idx.Final() { + ids, err := idx.IDs() + if err != nil { + debug.Log("index %d does not have an ID: %v", err) + return err + } + + debug.Log("adding index ids %v to supersedes field", ids) + + err = newIndex.AddToSupersedes(ids...) + if err != nil { + return err + } + obsolete.Merge(restic.NewIDSet(ids...)) + } else { + debug.Log("index %d isn't final, don't add to supersedes field", i) } - debug.Log("adding index ids %v to supersedes field", ids) + debug.Log("adding index %d", i) - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return nil, err - } - obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) - } - - debug.Log("adding index %d", i) - - for pbs := range idx.EachByPack(ctx, packBlacklist) { - newIndex.StorePack(pbs.packID, pbs.blobs) - p.Add(1) - if IndexFull(newIndex) { - if err := finalize(); err != nil { - return nil, err + for pbs := range idx.EachByPack(ctx, packBlacklist) { + newIndex.StorePack(pbs.packID, pbs.blobs) + p.Add(1) + if IndexFull(newIndex) { + select { + case ch <- newIndex: + case <-ctx.Done(): + return nil + } + newIndex = NewIndex() } } } - } - if err := finalize(); err != nil { - return nil, err + + err = newIndex.AddToSupersedes(extraObsolete...) + if err != nil { + return err + } + obsolete.Merge(restic.NewIDSet(extraObsolete...)) + + select { + case ch <- newIndex: + case <-ctx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range ch { + idx.Finalize() + if _, err := SaveIndex(ctx, repo, idx); err != nil { + return err + } + } + return nil } - return + // run workers on ch + wg.Go(func() error { + return RunWorkers(saveIndexParallelism, worker) + }) + + err = wg.Wait() + + return obsolete, err } diff --git a/internal/repository/master_index_test.go b/internal/repository/master_index_test.go index aa9905321..729f088ac 100644 --- a/internal/repository/master_index_test.go +++ b/internal/repository/master_index_test.go @@ -346,7 +346,7 @@ func TestIndexSave(t *testing.T) { repo.LoadIndex(context.TODO()) - obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil) + obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil, nil) if err != nil { t.Fatalf("unable to save new index: %v", err) } diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 87470d815..a1dffbecf 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" ) @@ -163,7 +162,21 @@ func saveIndex(t *testing.T, repo restic.Repository) { } func rebuildIndex(t *testing.T, repo restic.Repository) { - idx, _, err := index.New(context.TODO(), repo, restic.NewIDSet(), nil) + err := repo.SetIndex(repository.NewMasterIndex()) + if err != nil { + t.Fatal(err) + } + + packs := make(map[restic.ID]int64) + err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { + packs[id] = size + return nil + }) + if err != nil { + t.Fatal(err) + } + + _, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil) if err != nil { t.Fatal(err) } @@ -179,7 +192,9 @@ func rebuildIndex(t *testing.T, repo restic.Repository) { t.Fatal(err) } - _, err = idx.Save(context.TODO(), repo, nil) + _, err = (repo.Index()).(*repository.MasterIndex). + Save(context.TODO(), repo, restic.NewIDSet(), nil, nil) + if err != nil { t.Fatal(err) } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 051b99252..f40ce9097 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/cache" @@ -17,6 +18,7 @@ import ( "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" "github.com/minio/sha256-simd" "golang.org/x/sync/errgroup" @@ -515,6 +517,73 @@ func (r *Repository) LoadIndex(ctx context.Context) error { return nil } +const listPackParallelism = 10 + +// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). +// The index is added to the MasterIndex but not marked as finalized. +// Returned is the list of pack files which could not be read. +func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { + var m sync.Mutex + + debug.Log("Loading index from pack files") + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + type FileInfo struct { + restic.ID + Size int64 + } + ch := make(chan FileInfo) + + // send list of pack files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + for id, size := range packsize { + select { + case <-ctx.Done(): + return nil + case ch <- FileInfo{id, size}: + } + } + return nil + }) + + idx := NewIndex() + // a worker receives an pack ID from ch, reads the pack contents, and adds them to idx + worker := func() error { + for fi := range ch { + entries, _, err := r.ListPack(ctx, fi.ID, fi.Size) + if err != nil { + debug.Log("unable to list pack file %v", fi.ID.Str()) + m.Lock() + invalid = append(invalid, fi.ID) + m.Unlock() + } + idx.StorePack(fi.ID, entries) + p.Add(1) + } + + return nil + } + + // run workers on ch + wg.Go(func() error { + return RunWorkers(listPackParallelism, worker) + }) + + err = wg.Wait() + if err != nil { + return invalid, errors.Fatal(err.Error()) + } + + // Add idx to MasterIndex + r.idx.Insert(idx) + + return invalid, nil +} + // PrepareCache initializes the local cache. indexIDs is the list of IDs of // index files still present in the repo. func (r *Repository) PrepareCache(indexIDs restic.IDSet) error {