diff --git a/backend/idset.go b/backend/idset.go index 4f27f3489..817f80fff 100644 --- a/backend/idset.go +++ b/backend/idset.go @@ -39,6 +39,27 @@ func (s IDSet) List() IDs { return list } +// Equals returns true iff s equals other. +func (s IDSet) Equals(other IDSet) bool { + if len(s) != len(other) { + return false + } + + for id := range s { + if _, ok := other[id]; !ok { + return false + } + } + + for id := range other { + if _, ok := s[id]; !ok { + return false + } + } + + return true +} + func (s IDSet) String() string { str := s.List().String() if len(str) < 2 { diff --git a/backend/local/local.go b/backend/local/local.go index 487397353..ffcfe68d6 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -223,9 +223,11 @@ func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io. b.open[filename(b.p, t, name)] = append(open, f) b.mu.Unlock() - _, err = f.Seek(int64(offset), 0) - if err != nil { - return nil, err + if offset > 0 { + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } } if length == 0 { diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 13bd54340..c904bf4bc 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -336,9 +336,11 @@ func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.R return nil, err } - _, err = f.Seek(int64(offset), 0) - if err != nil { - return nil, err + if offset > 0 { + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } } if length == 0 { diff --git a/checker/checker.go b/checker/checker.go index d6fe49eb4..2c9536f14 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -49,8 +49,18 @@ func New(repo *repository.Repository) *Checker { const defaultParallelism = 40 +// ErrDuplicatePacks is returned when a pack is found in more than one index. +type ErrDuplicatePacks struct { + PackID backend.ID + Indexes backend.IDSet +} + +func (e ErrDuplicatePacks) Error() string { + return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes) +} + // LoadIndex loads all index files. -func (c *Checker) LoadIndex() error { +func (c *Checker) LoadIndex() (hints []error, errs []error) { debug.Log("LoadIndex", "Start") type indexRes struct { Index *repository.Index @@ -97,14 +107,22 @@ func (c *Checker) LoadIndex() error { done := make(chan struct{}) defer close(done) + if perr != nil { + errs = append(errs, perr) + return hints, errs + } + + packToIndex := make(map[backend.ID]backend.IDSet) + for res := range indexCh { debug.Log("LoadIndex", "process index %v", res.ID) - id, err := backend.ParseID(res.ID) + idxID, err := backend.ParseID(res.ID) if err != nil { - return err + errs = append(errs, fmt.Errorf("unable to parse as index ID: %v", res.ID)) + continue } - c.indexes[id] = res.Index + c.indexes[idxID] = res.Index c.masterIndex.Insert(res.Index) debug.Log("LoadIndex", "process blobs") @@ -114,6 +132,11 @@ func (c *Checker) LoadIndex() error { c.blobs[blob.ID] = struct{}{} c.blobRefs.M[blob.ID] = 0 cnt++ + + if _, ok := packToIndex[blob.PackID]; !ok { + packToIndex[blob.PackID] = backend.NewIDSet() + } + packToIndex[blob.PackID].Insert(idxID) } debug.Log("LoadIndex", "%d blobs processed", cnt) @@ -121,9 +144,20 @@ func (c *Checker) LoadIndex() error { debug.Log("LoadIndex", "done, error %v", perr) + debug.Log("LoadIndex", "checking for duplicate packs") + for packID := range c.packs { + debug.Log("LoadIndex", " check pack %v: contained in %d indexes", packID.Str(), len(packToIndex[packID])) + if len(packToIndex[packID]) > 1 { + hints = append(hints, ErrDuplicatePacks{ + PackID: packID, + Indexes: packToIndex[packID], + }) + } + } + c.repo.SetIndex(c.masterIndex) - return perr + return hints, errs } // PackError describes an error with a specific pack. diff --git a/checker/checker_test.go b/checker/checker_test.go index 3f5362a78..37d7f7a7b 100644 --- a/checker/checker_test.go +++ b/checker/checker_test.go @@ -59,7 +59,15 @@ func TestCheckRepo(t *testing.T) { repo := OpenLocalRepo(t, repodir) chkr := checker.New(repo) - OK(t, chkr.LoadIndex()) + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + OKs(t, checkPacks(chkr)) OKs(t, checkStruct(chkr)) }) @@ -73,8 +81,16 @@ func TestMissingPack(t *testing.T) { OK(t, repo.Backend().Remove(backend.Data, packID)) chkr := checker.New(repo) - OK(t, chkr.LoadIndex()) - errs := checkPacks(chkr) + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + errs = checkPacks(chkr) Assert(t, len(errs) == 1, "expected exactly one error, got %v", len(errs)) @@ -97,8 +113,16 @@ func TestUnreferencedPack(t *testing.T) { OK(t, repo.Backend().Remove(backend.Index, indexID)) chkr := checker.New(repo) - OK(t, chkr.LoadIndex()) - errs := checkPacks(chkr) + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + errs = checkPacks(chkr) Assert(t, len(errs) == 1, "expected exactly one error, got %v", len(errs)) @@ -130,7 +154,15 @@ func TestUnreferencedBlobs(t *testing.T) { sort.Sort(unusedBlobsBySnapshot) chkr := checker.New(repo) - OK(t, chkr.LoadIndex()) + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + OKs(t, checkPacks(chkr)) OKs(t, checkStruct(chkr)) @@ -140,3 +172,35 @@ func TestUnreferencedBlobs(t *testing.T) { Equals(t, unusedBlobsBySnapshot, blobs) }) } + +var checkerDuplicateIndexTestData = filepath.Join("testdata", "duplicate-packs-in-index-test-repo.tar.gz") + +func TestDuplicatePacksInIndex(t *testing.T) { + WithTestEnvironment(t, checkerDuplicateIndexTestData, func(repodir string) { + repo := OpenLocalRepo(t, repodir) + + chkr := checker.New(repo) + hints, errs := chkr.LoadIndex() + if len(hints) == 0 { + t.Fatalf("did not get expected checker hints for duplicate packs in indexes") + } + + found := false + for _, hint := range hints { + if _, ok := hint.(checker.ErrDuplicatePacks); ok { + found = true + } else { + t.Errorf("got unexpected hint: %v", hint) + } + } + + if !found { + t.Fatalf("did not find hint ErrDuplicatePacks") + } + + if len(errs) > 0 { + t.Errorf("expected no errors, got %v: %v", len(errs), errs) + } + + }) +} diff --git a/checker/testdata/duplicate-packs-in-index-test-repo.tar.gz b/checker/testdata/duplicate-packs-in-index-test-repo.tar.gz new file mode 100644 index 000000000..f0e194d8d Binary files /dev/null and b/checker/testdata/duplicate-packs-in-index-test-repo.tar.gz differ diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index 0d5993845..919568618 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -50,8 +50,25 @@ func (cmd CmdCheck) Execute(args []string) error { chkr := checker.New(repo) cmd.global.Verbosef("Load indexes\n") - if err = chkr.LoadIndex(); err != nil { - return err + hints, errs := chkr.LoadIndex() + + dupFound := false + for _, hint := range hints { + cmd.global.Printf("%v\n", hint) + if _, ok := hint.(checker.ErrDuplicatePacks); ok { + dupFound = true + } + } + + if dupFound { + cmd.global.Printf("\nrun `restic rebuild-index' to correct this\n") + } + + if len(errs) > 0 { + for _, err := range errs { + cmd.global.Warnf("error: %v\n", err) + } + return fmt.Errorf("LoadIndex returned errors") } done := make(chan struct{}) diff --git a/cmd/restic/cmd_dump.go b/cmd/restic/cmd_dump.go index 1a3ed4036..b7d456e4e 100644 --- a/cmd/restic/cmd_dump.go +++ b/cmd/restic/cmd_dump.go @@ -32,7 +32,7 @@ func init() { } func (cmd CmdDump) Usage() string { - return "[index|snapshots|trees|all]" + return "[indexes|snapshots|trees|all]" } func prettyPrintJSON(wr io.Writer, item interface{}) error { diff --git a/cmd/restic/cmd_rebuild_index.go b/cmd/restic/cmd_rebuild_index.go new file mode 100644 index 000000000..5444cacb2 --- /dev/null +++ b/cmd/restic/cmd_rebuild_index.go @@ -0,0 +1,188 @@ +package main + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" + "github.com/restic/restic/repository" +) + +type CmdRebuildIndex struct { + global *GlobalOptions + + repo *repository.Repository +} + +func init() { + _, err := parser.AddCommand("rebuild-index", + "rebuild the index", + "The rebuild-index command builds a new index", + &CmdRebuildIndex{global: &globalOpts}) + if err != nil { + panic(err) + } +} + +func (cmd CmdRebuildIndex) storeIndex(index *repository.Index) (*repository.Index, error) { + debug.Log("RebuildIndex.RebuildIndex", "saving index") + + cmd.global.Printf(" saving new index\n") + id, err := repository.SaveIndex(cmd.repo, index) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err) + return nil, err + } + + debug.Log("RebuildIndex.RebuildIndex", "index saved as %v", id.Str()) + index = repository.NewIndex() + + return index, nil +} + +func (cmd CmdRebuildIndex) RebuildIndex() error { + debug.Log("RebuildIndex.RebuildIndex", "start") + + done := make(chan struct{}) + defer close(done) + + indexIDs := backend.NewIDSet() + for id := range cmd.repo.List(backend.Index, done) { + indexIDs.Insert(id) + } + + cmd.global.Printf("rebuilding index from %d indexes\n", len(indexIDs)) + + debug.Log("RebuildIndex.RebuildIndex", "found %v indexes", len(indexIDs)) + + combinedIndex := repository.NewIndex() + packsDone := backend.NewIDSet() + + i := 0 + for indexID := range indexIDs { + cmd.global.Printf(" loading index %v\n", i) + + debug.Log("RebuildIndex.RebuildIndex", "load index %v", indexID.Str()) + idx, err := repository.LoadIndex(cmd.repo, indexID.String()) + if err != nil { + return err + } + + debug.Log("RebuildIndex.RebuildIndex", "adding blobs from index %v", indexID.Str()) + + for packedBlob := range idx.Each(done) { + combinedIndex.Store(packedBlob.Type, packedBlob.ID, packedBlob.PackID, packedBlob.Offset, packedBlob.Length) + packsDone.Insert(packedBlob.PackID) + } + + combinedIndex.AddToSupersedes(indexID) + + if repository.IndexFull(combinedIndex) { + combinedIndex, err = cmd.storeIndex(combinedIndex) + if err != nil { + return err + } + } + + i++ + } + + var err error + if combinedIndex.Length() > 0 { + combinedIndex, err = cmd.storeIndex(combinedIndex) + if err != nil { + return err + } + } + + cmd.global.Printf("removing %d old indexes\n", len(indexIDs)) + for id := range indexIDs { + debug.Log("RebuildIndex.RebuildIndex", "remove index %v", id.Str()) + + err := cmd.repo.Backend().Remove(backend.Index, id.String()) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "error removing index %v: %v", id.Str(), err) + return err + } + } + + cmd.global.Printf("checking for additional packs\n") + newPacks := 0 + for packID := range cmd.repo.List(backend.Data, done) { + if packsDone.Has(packID) { + continue + } + + debug.Log("RebuildIndex.RebuildIndex", "pack %v not indexed", packID.Str()) + newPacks++ + + rd, err := cmd.repo.Backend().GetReader(backend.Data, packID.String(), 0, 0) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "GetReader returned error: %v", err) + return err + } + + var readSeeker io.ReadSeeker + if r, ok := rd.(io.ReadSeeker); ok { + debug.Log("RebuildIndex.RebuildIndex", "reader is seekable") + readSeeker = r + } else { + debug.Log("RebuildIndex.RebuildIndex", "reader is not seekable, loading contents to ram") + buf, err := ioutil.ReadAll(rd) + if err != nil { + return err + } + + readSeeker = bytes.NewReader(buf) + } + + up, err := pack.NewUnpacker(cmd.repo.Key(), readSeeker) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "error while unpacking pack %v", packID.Str()) + return err + } + + for _, blob := range up.Entries { + debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob) + combinedIndex.Store(blob.Type, blob.ID, packID, blob.Offset, blob.Length) + } + + if repository.IndexFull(combinedIndex) { + combinedIndex, err = cmd.storeIndex(combinedIndex) + if err != nil { + return err + } + } + } + + if combinedIndex.Length() > 0 { + combinedIndex, err = cmd.storeIndex(combinedIndex) + if err != nil { + return err + } + } + + cmd.global.Printf("added %d packs to the index\n", newPacks) + + debug.Log("RebuildIndex.RebuildIndex", "done") + return nil +} + +func (cmd CmdRebuildIndex) Execute(args []string) error { + repo, err := cmd.global.OpenRepository() + if err != nil { + return err + } + cmd.repo = repo + + lock, err := lockRepoExclusive(repo) + defer unlockRepo(lock) + if err != nil { + return err + } + + return cmd.RebuildIndex() +} diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 360adca0f..24f8e52db 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -90,6 +90,19 @@ func cmdCheck(t testing.TB, global GlobalOptions) { OK(t, cmd.Execute(nil)) } +func cmdCheckOutput(t testing.TB, global GlobalOptions) string { + buf := bytes.NewBuffer(nil) + global.stdout = buf + cmd := &CmdCheck{global: &global, ReadData: true} + OK(t, cmd.Execute(nil)) + return string(buf.Bytes()) +} + +func cmdRebuildIndex(t testing.TB, global GlobalOptions) { + cmd := &CmdRebuildIndex{global: &global} + OK(t, cmd.Execute(nil)) +} + func cmdLs(t testing.TB, global GlobalOptions, snapshotID string) []string { var buf bytes.Buffer global.stdout = &buf @@ -646,3 +659,26 @@ func TestFind(t *testing.T) { Assert(t, len(results) < 2, "less than two file found in repo (%v)", datafile) }) } + +func TestRebuildIndex(t *testing.T) { + withTestEnvironment(t, func(env *testEnvironment, global GlobalOptions) { + datafile := filepath.Join("..", "..", "checker", "testdata", "duplicate-packs-in-index-test-repo.tar.gz") + SetupTarTestFixture(t, env.base, datafile) + + out := cmdCheckOutput(t, global) + if !strings.Contains(out, "contained in several indexes") { + t.Fatalf("did not find checker hint for packs in several indexes") + } + + if !strings.Contains(out, "restic rebuild-index") { + t.Fatalf("did not find hint for rebuild-index comman") + } + + cmdRebuildIndex(t, global) + + out = cmdCheckOutput(t, global) + if len(out) != 0 { + t.Fatalf("expected no output from the checker, got: %v", out) + } + }) +} diff --git a/debug/debug.go b/debug/debug.go index a76b66587..1e85650aa 100644 --- a/debug/debug.go +++ b/debug/debug.go @@ -121,7 +121,7 @@ func getPosition() string { goroutine := goroutineNum() - return fmt.Sprintf("%3d %s:%3d", goroutine, filepath.Base(file), line) + return fmt.Sprintf("%3d %s:%d", goroutine, filepath.Base(file), line) } var maxTagLen = 10 diff --git a/pack/pack.go b/pack/pack.go index 727566dcf..697adb8aa 100644 --- a/pack/pack.go +++ b/pack/pack.go @@ -233,7 +233,7 @@ type Unpacker struct { // NewUnpacker returns a pointer to Unpacker which can be used to read // individual Blobs from a pack. -func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, error) { +func NewUnpacker(k *crypto.Key, rd io.ReadSeeker) (*Unpacker, error) { var err error ls := binary.Size(uint32(0)) @@ -261,28 +261,28 @@ func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, er return nil, err } - if entries == nil { - pos := uint(0) - for { - e := headerEntry{} - err = binary.Read(hrd, binary.LittleEndian, &e) - if err == io.EOF { - break - } + var entries []Blob - if err != nil { - return nil, err - } - - entries = append(entries, Blob{ - Type: e.Type, - Length: uint(e.Length), - ID: e.ID, - Offset: pos, - }) - - pos += uint(e.Length) + pos := uint(0) + for { + e := headerEntry{} + err = binary.Read(hrd, binary.LittleEndian, &e) + if err == io.EOF { + break } + + if err != nil { + return nil, err + } + + entries = append(entries, Blob{ + Type: e.Type, + Length: uint(e.Length), + ID: e.ID, + Offset: pos, + }) + + pos += uint(e.Length) } p := &Unpacker{ diff --git a/pack/pack_test.go b/pack/pack_test.go index c646bc725..28ef4c22c 100644 --- a/pack/pack_test.go +++ b/pack/pack_test.go @@ -67,7 +67,7 @@ func TestCreatePack(t *testing.T) { // read and parse it again rd := bytes.NewReader(file.Bytes()) - np, err := pack.NewUnpacker(k, nil, rd) + np, err := pack.NewUnpacker(k, rd) OK(t, err) Equals(t, len(np.Entries), len(bufs)) @@ -85,7 +85,7 @@ func TestCreatePack(t *testing.T) { } } -var blobTypeJson = []struct { +var blobTypeJSON = []struct { t pack.BlobType res string }{ @@ -94,7 +94,7 @@ var blobTypeJson = []struct { } func TestBlobTypeJSON(t *testing.T) { - for _, test := range blobTypeJson { + for _, test := range blobTypeJSON { // test serialize buf, err := json.Marshal(test.t) OK(t, err) diff --git a/repository/index.go b/repository/index.go index 006944fb6..25e4c5143 100644 --- a/repository/index.go +++ b/repository/index.go @@ -26,7 +26,7 @@ type Index struct { type indexEntry struct { tpe pack.BlobType - packID *backend.ID + packID backend.ID offset uint length uint } @@ -39,7 +39,7 @@ func NewIndex() *Index { } } -func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { +func (idx *Index) store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) { idx.pack[id] = indexEntry{ tpe: t, packID: pack, @@ -64,8 +64,8 @@ const ( indexMaxAge = 15 * time.Minute ) -// Full returns true iff the index is "full enough" to be saved as a preliminary index. -func (idx *Index) Full() bool { +// IndexFull returns true iff the index is "full enough" to be saved as a preliminary index. +var IndexFull = func(idx *Index) bool { idx.m.Lock() defer idx.m.Unlock() @@ -95,7 +95,7 @@ func (idx *Index) Full() bool { // Store remembers the id and pack in the index. An existing entry will be // silently overwritten. -func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) { +func (idx *Index) Store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) { idx.m.Lock() defer idx.m.Unlock() @@ -110,7 +110,7 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset } // Lookup returns the pack for the id. -func (idx *Index) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) { +func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) { idx.m.Lock() defer idx.m.Unlock() @@ -121,7 +121,7 @@ func (idx *Index) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, } debug.Log("Index.Lookup", "id %v not found", id.Str()) - return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in index", id) + return backend.ID{}, pack.Data, 0, 0, fmt.Errorf("id %v not found in index", id) } // Has returns true iff the id is listed in the index. @@ -165,6 +165,20 @@ func (idx *Index) Supersedes() backend.IDs { return idx.supersedes } +// AddToSupersedes adds the ids to the list of indexes superseded by this +// index. If the index has already been finalized, an error is returned. +func (idx *Index) AddToSupersedes(ids ...backend.ID) error { + idx.m.Lock() + defer idx.m.Unlock() + + if idx.final { + return errors.New("index already finalized") + } + + idx.supersedes = append(idx.supersedes, ids...) + return nil +} + // PackedBlob is a blob already saved within a pack. type PackedBlob struct { pack.Blob @@ -196,7 +210,7 @@ func (idx *Index) Each(done chan struct{}) <-chan PackedBlob { Type: blob.tpe, Length: blob.length, }, - PackID: *blob.packID, + PackID: blob.packID, }: } } @@ -205,6 +219,19 @@ func (idx *Index) Each(done chan struct{}) <-chan PackedBlob { return ch } +// Packs returns all packs in this index +func (idx *Index) Packs() backend.IDSet { + idx.m.Lock() + defer idx.m.Unlock() + + packs := backend.NewIDSet() + for _, entry := range idx.pack { + packs.Insert(entry.packID) + } + + return packs +} + // Count returns the number of blobs of type t in the index. func (idx *Index) Count(t pack.BlobType) (n uint) { debug.Log("Index.Count", "counting blobs of type %v", t) @@ -221,6 +248,15 @@ func (idx *Index) Count(t pack.BlobType) (n uint) { return } +// Length returns the number of entries in the Index. +func (idx *Index) Length() uint { + debug.Log("Index.Count", "counting blobs") + idx.m.Lock() + defer idx.m.Unlock() + + return uint(len(idx.pack)) +} + type packJSON struct { ID backend.ID `json:"id"` Blobs []blobJSON `json:"blobs"` @@ -233,20 +269,14 @@ type blobJSON struct { Length uint `json:"length"` } -// generatePackList returns a list of packs containing only the index entries -// that selsectFn returned true for. If selectFn is nil, the list contains all -// blobs in the index. -func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON, error) { +// generatePackList returns a list of packs. +func (idx *Index) generatePackList() ([]*packJSON, error) { list := []*packJSON{} packs := make(map[backend.ID]*packJSON) for id, blob := range idx.pack { - if blob.packID == nil { - panic("nil pack id") - } - - if selectFn != nil && !selectFn(blob) { - continue + if blob.packID.IsNull() { + panic("null pack id") } debug.Log("Index.generatePackList", "handle blob %v", id.Str()) @@ -258,10 +288,10 @@ func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON, } // see if pack is already in map - p, ok := packs[*blob.packID] + p, ok := packs[blob.packID] if !ok { // else create new pack - p = &packJSON{ID: *blob.packID} + p = &packJSON{ID: blob.packID} // and append it to the list and map list = append(list, p) @@ -302,7 +332,7 @@ func (idx *Index) Encode(w io.Writer) error { func (idx *Index) encode(w io.Writer) error { debug.Log("Index.encode", "encoding index") - list, err := idx.generatePackList(nil) + list, err := idx.generatePackList() if err != nil { return err } @@ -332,7 +362,7 @@ func (idx *Index) Dump(w io.Writer) error { idx.m.Lock() defer idx.m.Unlock() - list, err := idx.generatePackList(nil) + list, err := idx.generatePackList() if err != nil { return err } @@ -386,7 +416,7 @@ func DecodeIndex(rd io.Reader) (idx *Index, err error) { idx = NewIndex() for _, pack := range idxJSON.Packs { for _, blob := range pack.Blobs { - idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length) + idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length) } } idx.supersedes = idxJSON.Supersedes @@ -411,7 +441,7 @@ func DecodeOldIndex(rd io.Reader) (idx *Index, err error) { idx = NewIndex() for _, pack := range list { for _, blob := range pack.Blobs { - idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length) + idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length) } } diff --git a/repository/index_test.go b/repository/index_test.go index 4b713eaa7..331ce6a7f 100644 --- a/repository/index_test.go +++ b/repository/index_test.go @@ -41,7 +41,7 @@ func TestIndexSerialize(t *testing.T) { for j := 0; j < 20; j++ { id := randomID() length := uint(i*100 + j) - idx.Store(pack.Data, id, &packID, pos, length) + idx.Store(pack.Data, id, packID, pos, length) tests = append(tests, testEntry{ id: id, @@ -72,7 +72,7 @@ func TestIndexSerialize(t *testing.T) { packID, tpe, offset, length, err := idx.Lookup(testBlob.id) OK(t, err) - Equals(t, testBlob.pack, *packID) + Equals(t, testBlob.pack, packID) Equals(t, testBlob.tpe, tpe) Equals(t, testBlob.offset, offset) Equals(t, testBlob.length, length) @@ -80,7 +80,7 @@ func TestIndexSerialize(t *testing.T) { packID, tpe, offset, length, err = idx2.Lookup(testBlob.id) OK(t, err) - Equals(t, testBlob.pack, *packID) + Equals(t, testBlob.pack, packID) Equals(t, testBlob.tpe, tpe) Equals(t, testBlob.offset, offset) Equals(t, testBlob.length, length) @@ -95,7 +95,7 @@ func TestIndexSerialize(t *testing.T) { for j := 0; j < 10; j++ { id := randomID() length := uint(i*100 + j) - idx.Store(pack.Data, id, &packID, pos, length) + idx.Store(pack.Data, id, packID, pos, length) newtests = append(newtests, testEntry{ id: id, @@ -129,7 +129,7 @@ func TestIndexSerialize(t *testing.T) { packID, tpe, offset, length, err := idx3.Lookup(testBlob.id) OK(t, err) - Equals(t, testBlob.pack, *packID) + Equals(t, testBlob.pack, packID) Equals(t, testBlob.tpe, tpe) Equals(t, testBlob.offset, offset) Equals(t, testBlob.length, length) @@ -148,7 +148,7 @@ func TestIndexSize(t *testing.T) { for j := 0; j < blobs; j++ { id := randomID() length := uint(i*100 + j) - idx.Store(pack.Data, id, &packID, pos, length) + idx.Store(pack.Data, id, packID, pos, length) pos += length } @@ -250,7 +250,7 @@ func TestIndexUnserialize(t *testing.T) { packID, tpe, offset, length, err := idx.Lookup(test.id) OK(t, err) - Equals(t, test.packID, *packID) + Equals(t, test.packID, packID) Equals(t, test.tpe, tpe) Equals(t, test.offset, offset) Equals(t, test.length, length) @@ -267,7 +267,7 @@ func TestIndexUnserializeOld(t *testing.T) { packID, tpe, offset, length, err := idx.Lookup(test.id) OK(t, err) - Equals(t, test.packID, *packID) + Equals(t, test.packID, packID) Equals(t, test.tpe, tpe) Equals(t, test.offset, offset) Equals(t, test.length, length) @@ -313,7 +313,7 @@ func TestConvertIndex(t *testing.T) { packID, tpe, offset, length, err := oldIndex.Lookup(packedBlob.ID) OK(t, err) - Assert(t, *packID == packedBlob.PackID, + Assert(t, packID == packedBlob.PackID, "Check blob %v: pack ID %v != %v", packedBlob.ID, packID, packedBlob.PackID) Assert(t, tpe == packedBlob.Type, "Check blob %v: Type %v != %v", packedBlob.ID, tpe, packedBlob.Type) @@ -325,3 +325,18 @@ func TestConvertIndex(t *testing.T) { } }) } + +func TestIndexPacks(t *testing.T) { + idx := repository.NewIndex() + packs := backend.NewIDSet() + + for i := 0; i < 20; i++ { + packID := randomID() + idx.Store(pack.Data, randomID(), packID, 0, 23) + + packs.Insert(packID) + } + + idxPacks := idx.Packs() + Assert(t, packs.Equals(idxPacks), "packs in index do not match packs added to index") +} diff --git a/repository/master_index.go b/repository/master_index.go index 5a61569fa..4d52789af 100644 --- a/repository/master_index.go +++ b/repository/master_index.go @@ -33,7 +33,7 @@ func NewMasterIndex() *MasterIndex { } // Lookup queries all known Indexes for the ID and returns the first match. -func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) { +func (mi *MasterIndex) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) { mi.idxMutex.RLock() defer mi.idxMutex.RUnlock() @@ -50,7 +50,7 @@ func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobT } debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str()) - return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id) + return backend.ID{}, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id) } // LookupSize queries all known Indexes for the ID and returns the first match. @@ -206,7 +206,7 @@ func (mi *MasterIndex) FullIndexes() []*Index { continue } - if idx.Full() { + if IndexFull(idx) { debug.Log("MasterIndex.FullIndexes", "index %p is full", idx) list = append(list, idx) } else { diff --git a/repository/repository.go b/repository/repository.go index 670b71f5d..e841c692d 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -269,7 +269,7 @@ func (r *Repository) savePacker(p *pack.Packer) error { // update blobs in the index for _, b := range p.Blobs() { debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) - r.idx.Current().Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length)) + r.idx.Current().Store(b.Type, b.ID, sid, b.Offset, uint(b.Length)) r.idx.RemoveFromInFlight(b.ID) } @@ -507,28 +507,37 @@ func (bw *BlobWriter) ID() backend.ID { return bw.id } +// SaveIndex saves an index to repo's backend. +func SaveIndex(repo *Repository, index *Index) (backend.ID, error) { + blob, err := repo.CreateEncryptedBlob(backend.Index) + if err != nil { + return backend.ID{}, err + } + + err = index.Finalize(blob) + if err != nil { + return backend.ID{}, err + } + + err = blob.Close() + if err != nil { + return backend.ID{}, err + } + + sid := blob.ID() + return sid, nil +} + // saveIndex saves all indexes in the backend. func (r *Repository) saveIndex(indexes ...*Index) error { for i, idx := range indexes { debug.Log("Repo.SaveIndex", "Saving index %d", i) - blob, err := r.CreateEncryptedBlob(backend.Index) + sid, err := SaveIndex(r, idx) if err != nil { return err } - err = idx.Encode(blob) - if err != nil { - return err - } - - err = blob.Close() - if err != nil { - return err - } - - sid := blob.ID() - debug.Log("Repo.SaveIndex", "Saved index %d as %v", i, sid.Str()) } diff --git a/repository/repository_test.go b/repository/repository_test.go index 93daece63..3028bd838 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/json" "io" + mrand "math/rand" "path/filepath" "testing" @@ -220,3 +221,70 @@ func BenchmarkLoadIndex(b *testing.B) { } }) } + +// saveRandomDataBlobs generates random data blobs and saves them to the repository. +func saveRandomDataBlobs(t testing.TB, repo *repository.Repository, num int, sizeMax int) { + for i := 0; i < num; i++ { + size := mrand.Int() % sizeMax + + buf := make([]byte, size) + _, err := io.ReadFull(rand.Reader, buf) + OK(t, err) + + _, err = repo.SaveAndEncrypt(pack.Data, buf, nil) + OK(t, err) + } +} + +func TestRepositoryIncrementalIndex(t *testing.T) { + repo := SetupRepo() + defer TeardownRepo(repo) + + repository.IndexFull = func(*repository.Index) bool { return true } + + // add 15 packs + for j := 0; j < 5; j++ { + // add 3 packs, write intermediate index + for i := 0; i < 3; i++ { + saveRandomDataBlobs(t, repo, 5, 1<<15) + OK(t, repo.Flush()) + } + + OK(t, repo.SaveFullIndex()) + } + + // add another 5 packs + for i := 0; i < 5; i++ { + saveRandomDataBlobs(t, repo, 5, 1<<15) + OK(t, repo.Flush()) + } + + // save final index + OK(t, repo.SaveIndex()) + + type packEntry struct { + id backend.ID + indexes []*repository.Index + } + + packEntries := make(map[backend.ID]map[backend.ID]struct{}) + + for id := range repo.List(backend.Index, nil) { + idx, err := repository.LoadIndex(repo, id.String()) + OK(t, err) + + for pb := range idx.Each(nil) { + if _, ok := packEntries[pb.PackID]; !ok { + packEntries[pb.PackID] = make(map[backend.ID]struct{}) + } + + packEntries[pb.PackID][id] = struct{}{} + } + } + + for packID, ids := range packEntries { + if len(ids) > 1 { + t.Errorf("pack %v listed in %d indexes\n", packID, len(ids)) + } + } +}