Merge pull request #1584 from restic/limit-index-file-size

Limit index file size
This commit is contained in:
Alexander Neumann 2018-01-26 21:53:12 +01:00
commit 4b2f2b542d
4 changed files with 61 additions and 78 deletions

View File

@ -73,12 +73,12 @@ func rebuildIndex(ctx context.Context, repo restic.Repository, ignorePacks resti
return err return err
} }
id, err := idx.Save(ctx, repo, supersedes) ids, err := idx.Save(ctx, repo, supersedes)
if err != nil { if err != nil {
return errors.Fatalf("unable to save index, last error was: %v", err) return errors.Fatalf("unable to save index, last error was: %v", err)
} }
Verbosef("saved new index as %v\n", id.Str()) Verbosef("saved new indexes as %v\n", ids)
Verbosef("remove %d old index files\n", len(supersedes)) Verbosef("remove %d old index files\n", len(supersedes))

View File

@ -85,8 +85,8 @@ type blobJSON struct {
} }
type indexJSON struct { type indexJSON struct {
Supersedes restic.IDs `json:"supersedes,omitempty"` Supersedes restic.IDs `json:"supersedes,omitempty"`
Packs []*packJSON `json:"packs"` Packs []packJSON `json:"packs"`
} }
func loadIndexJSON(ctx context.Context, repo restic.Repository, id restic.ID) (*indexJSON, error) { func loadIndexJSON(ctx context.Context, repo restic.Repository, id restic.ID) (*indexJSON, error) {
@ -257,26 +257,24 @@ func (idx *Index) FindBlob(h restic.BlobHandle) (result []Location, err error) {
return result, nil return result, nil
} }
const maxEntries = 3000
// Save writes the complete index to the repo. // Save writes the complete index to the repo.
func (idx *Index) Save(ctx context.Context, repo restic.Repository, supersedes restic.IDs) (restic.ID, error) { func (idx *Index) Save(ctx context.Context, repo restic.Repository, supersedes restic.IDs) (restic.IDs, error) {
packs := make(map[restic.ID][]restic.Blob, len(idx.Packs)) debug.Log("pack files: %d\n", len(idx.Packs))
for id, p := range idx.Packs {
packs[id] = p.Entries
}
return Save(ctx, repo, packs, supersedes) var indexIDs []restic.ID
}
// Save writes a new index containing the given packs. packs := 0
func Save(ctx context.Context, repo restic.Repository, packs map[restic.ID][]restic.Blob, supersedes restic.IDs) (restic.ID, error) { jsonIDX := &indexJSON{
idx := &indexJSON{
Supersedes: supersedes, Supersedes: supersedes,
Packs: make([]*packJSON, 0, len(packs)), Packs: make([]packJSON, 0, maxEntries),
} }
for packID, blobs := range packs { for packID, pack := range idx.Packs {
b := make([]blobJSON, 0, len(blobs)) debug.Log("%04d add pack %v with %d entries", packs, packID, len(pack.Entries))
for _, blob := range blobs { b := make([]blobJSON, 0, len(pack.Entries))
for _, blob := range pack.Entries {
b = append(b, blobJSON{ b = append(b, blobJSON{
ID: blob.ID, ID: blob.ID,
Type: blob.Type, Type: blob.Type,
@ -285,13 +283,35 @@ func Save(ctx context.Context, repo restic.Repository, packs map[restic.ID][]res
}) })
} }
p := &packJSON{ p := packJSON{
ID: packID, ID: packID,
Blobs: b, Blobs: b,
} }
idx.Packs = append(idx.Packs, p) jsonIDX.Packs = append(jsonIDX.Packs, p)
packs++
if packs == maxEntries {
id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX)
if err != nil {
return nil, err
}
debug.Log("saved new index as %v", id)
indexIDs = append(indexIDs, id)
packs = 0
jsonIDX.Packs = jsonIDX.Packs[:0]
}
} }
return repo.SaveJSONUnpacked(ctx, restic.IndexFile, idx) if packs > 0 {
id, err := repo.SaveJSONUnpacked(ctx, restic.IndexFile, jsonIDX)
if err != nil {
return nil, err
}
debug.Log("saved new index as %v", id)
indexIDs = append(indexIDs, id)
}
return indexIDs, nil
} }

View File

@ -2,7 +2,6 @@ package index
import ( import (
"context" "context"
"math/rand"
"testing" "testing"
"time" "time"
@ -177,12 +176,12 @@ func BenchmarkIndexSave(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
id, err := idx.Save(context.TODO(), repo, nil) ids, err := idx.Save(context.TODO(), repo, nil)
if err != nil { if err != nil {
b.Fatalf("New() returned error %v", err) b.Fatalf("New() returned error %v", err)
} }
b.Logf("saved as %v", id.Str()) b.Logf("saved as %v", ids)
} }
} }
@ -217,69 +216,18 @@ func loadIndex(t testing.TB, repo restic.Repository) *Index {
return idx return idx
} }
func TestSave(t *testing.T) {
repo, cleanup := createFilledRepo(t, 3, 0)
defer cleanup()
idx := loadIndex(t, repo)
packs := make(map[restic.ID][]restic.Blob)
for id := range idx.Packs {
if rand.Float32() < 0.5 {
packs[id] = idx.Packs[id].Entries
}
}
t.Logf("save %d/%d packs in a new index\n", len(packs), len(idx.Packs))
id, err := Save(context.TODO(), repo, packs, idx.IndexIDs.List())
if err != nil {
t.Fatalf("unable to save new index: %v", err)
}
t.Logf("new index saved as %v", id.Str())
for id := range idx.IndexIDs {
t.Logf("remove index %v", id.Str())
h := restic.Handle{Type: restic.IndexFile, Name: id.String()}
err = repo.Backend().Remove(context.TODO(), h)
if err != nil {
t.Errorf("error removing index %v: %v", id, err)
}
}
idx2 := loadIndex(t, repo)
t.Logf("load new index with %d packs", len(idx2.Packs))
if len(idx2.Packs) != len(packs) {
t.Errorf("wrong number of packs in new index, want %d, got %d", len(packs), len(idx2.Packs))
}
for id := range packs {
if _, ok := idx2.Packs[id]; !ok {
t.Errorf("pack %v is not contained in new index", id.Str())
}
}
for id := range idx2.Packs {
if _, ok := packs[id]; !ok {
t.Errorf("pack %v is not contained in new index", id.Str())
}
}
}
func TestIndexSave(t *testing.T) { func TestIndexSave(t *testing.T) {
repo, cleanup := createFilledRepo(t, 3, 0) repo, cleanup := createFilledRepo(t, 3, 0)
defer cleanup() defer cleanup()
idx := loadIndex(t, repo) idx := loadIndex(t, repo)
id, err := idx.Save(context.TODO(), repo, idx.IndexIDs.List()) ids, err := idx.Save(context.TODO(), repo, idx.IndexIDs.List())
if err != nil { if err != nil {
t.Fatalf("unable to save new index: %v", err) t.Fatalf("unable to save new index: %v", err)
} }
t.Logf("new index saved as %v", id.Str()) t.Logf("new index saved as %v", ids)
for id := range idx.IndexIDs { for id := range idx.IndexIDs {
t.Logf("remove index %v", id.Str()) t.Logf("remove index %v", id.Str())
@ -302,6 +250,21 @@ func TestIndexSave(t *testing.T) {
for _, err := range errs { for _, err := range errs {
t.Errorf("checker found error: %v", err) t.Errorf("checker found error: %v", err)
} }
ctx, cancel := context.WithCancel(context.TODO())
errCh := make(chan error)
go checker.Structure(ctx, errCh)
i := 0
for err := range errCh {
t.Errorf("checker returned error: %v", err)
i++
if i == 10 {
t.Errorf("more than 10 errors returned, skipping the rest")
cancel()
break
}
}
} }
func TestIndexAddRemovePack(t *testing.T) { func TestIndexAddRemovePack(t *testing.T) {

View File

@ -369,7 +369,7 @@ func (r *Repository) SaveFullIndex(ctx context.Context) error {
return r.saveIndex(ctx, r.idx.FullIndexes()...) return r.saveIndex(ctx, r.idx.FullIndexes()...)
} }
const loadIndexParallelism = 20 const loadIndexParallelism = 4
// LoadIndex loads all index files from the backend in parallel and stores them // LoadIndex loads all index files from the backend in parallel and stores them
// in the master index. The first error that occurred is returned. // in the master index. The first error that occurred is returned.