From 55ddd5317d657e7788b081a76bcaea1d48167498 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 26 Jul 2015 14:25:01 +0200 Subject: [PATCH 1/4] Reduce memory usage for fuse mount This changes `repository.LoadBlob()` so that a destination buffer must be provided, which enables the fuse code to use a buffer from a `sync.Pool`. In addition, release the buffers when the file is closed. At the moment, the max memory usage is defined by the max file size that is read in one go (e.g. with `cat`). It could be further optimized by implementing a LRU caching scheme. --- cmd/restic/cmd_cat.go | 15 +++++-- cmd/restic/fuse/file.go | 82 ++++++++++++++++++++++++----------- node.go | 13 +++++- repository/repository.go | 26 ++++++++--- repository/repository_test.go | 4 +- 5 files changed, 101 insertions(+), 39 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index d190877a4..05f58c976 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -162,9 +162,18 @@ func (cmd CmdCat) Execute(args []string) error { return err case "blob": - data, err := repo.LoadBlob(pack.Data, id) - if err == nil { - _, err = os.Stdout.Write(data) + _, tpe, _, length, err := repo.Index().Lookup(id) + if err != nil { + return err + } + + if tpe != pack.Data { + return errors.New("wrong type for blob") + } + + buf := make([]byte, length) + data, err := repo.LoadBlob(pack.Data, id, buf) + if err != nil { return err } diff --git a/cmd/restic/fuse/file.go b/cmd/restic/fuse/file.go index 61b2ba170..895c844aa 100644 --- a/cmd/restic/fuse/file.go +++ b/cmd/restic/fuse/file.go @@ -1,6 +1,8 @@ package fuse import ( + "sync" + "github.com/restic/restic" "github.com/restic/restic/pack" "github.com/restic/restic/repository" @@ -12,6 +14,7 @@ import ( // Statically ensure that *file implements the given interface var _ = fs.HandleReader(&file{}) +var _ = fs.HandleReleaser(&file{}) type file struct { repo *repository.Repository @@ -21,6 +24,14 @@ type file struct { blobs [][]byte } +const defaultBlobSize = 128 * 1024 + +var blobPool = sync.Pool{ + New: func() interface{} { + return make([]byte, defaultBlobSize) + }, +} + func newFile(repo *repository.Repository, node *restic.Node) (*file, error) { sizes := make([]uint32, len(node.Content)) for i, blobID := range node.Content { @@ -48,50 +59,69 @@ func (f *file) Attr(ctx context.Context, a *fuse.Attr) error { func (f *file) getBlobAt(i int) (blob []byte, err error) { if f.blobs[i] != nil { - blob = f.blobs[i] - } else { - blob, err = f.repo.LoadBlob(pack.Data, f.node.Content[i]) - if err != nil { - return nil, err - } - f.blobs[i] = blob + return f.blobs[i], nil } + buf := blobPool.Get().([]byte) + buf = buf[:cap(buf)] + + if uint32(len(buf)) < f.sizes[i] { + if len(buf) > defaultBlobSize { + blobPool.Put(buf) + } + buf = make([]byte, f.sizes[i]) + } + + blob, err = f.repo.LoadBlob(pack.Data, f.node.Content[i], buf) + if err != nil { + return nil, err + } + f.blobs[i] = blob + return blob, nil } func (f *file) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - off := req.Offset + offset := req.Offset // Skip blobs before the offset startContent := 0 - for off > int64(f.sizes[startContent]) { - off -= int64(f.sizes[startContent]) + for offset > int64(f.sizes[startContent]) { + offset -= int64(f.sizes[startContent]) startContent++ } - content := make([]byte, req.Size) - allContent := content - for i := startContent; i < len(f.sizes); i++ { + dst := resp.Data[0:req.Size] + readBytes := 0 + remainingBytes := req.Size + for i := startContent; remainingBytes > 0 && i < len(f.sizes); i++ { blob, err := f.getBlobAt(i) if err != nil { return err } - blob = blob[off:] - off = 0 + if offset > 0 { + blob = blob[offset:len(blob)] + offset = 0 + } - var copied int - if len(blob) > len(content) { - copied = copy(content[0:], blob[:len(content)]) - } else { - copied = copy(content[0:], blob) - } - content = content[copied:] - if len(content) == 0 { - break - } + copied := copy(dst, blob) + remainingBytes -= copied + readBytes += copied + + dst = dst[copied:] + } + resp.Data = resp.Data[:readBytes] + + return nil +} + +func (f *file) Release(ctx context.Context, req *fuse.ReleaseRequest) error { + for i := range f.blobs { + if f.blobs[i] != nil { + blobPool.Put(f.blobs[i]) + f.blobs[i] = nil + } } - resp.Data = allContent return nil } diff --git a/node.go b/node.go index a89d52e9a..e607bc8b1 100644 --- a/node.go +++ b/node.go @@ -209,8 +209,19 @@ func (node Node) createFileAt(path string, repo *repository.Repository) error { return errors.Annotate(err, "OpenFile") } + var buf []byte for _, id := range node.Content { - buf, err := repo.LoadBlob(pack.Data, id) + _, _, _, length, err := repo.Index().Lookup(id) + if err != nil { + return err + } + + buf = buf[:cap(buf)] + if uint(len(buf)) < length { + buf = make([]byte, length) + } + + buf, err := repo.LoadBlob(pack.Data, id, buf) if err != nil { return errors.Annotate(err, "Load") } diff --git a/repository/repository.go b/repository/repository.go index a53e3d4b2..faef38f61 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -55,7 +55,6 @@ func (r *Repository) PrefixLength(t backend.Type) (int, error) { func (r *Repository) LoadAndDecrypt(t backend.Type, id backend.ID) ([]byte, error) { debug.Log("Repo.Load", "load %v with id %v", t, id.Str()) - // load blob from pack rd, err := r.be.Get(t, id.String()) if err != nil { debug.Log("Repo.Load", "error loading %v: %v", id.Str(), err) @@ -87,8 +86,9 @@ func (r *Repository) LoadAndDecrypt(t backend.Type, id backend.ID) ([]byte, erro } // LoadBlob tries to load and decrypt content identified by t and id from a -// pack from the backend. -func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { +// pack from the backend, the result is stored in buf, which must be large +// enough to hold the complete blob. +func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byte, error) { debug.Log("Repo.LoadBlob", "load %v with id %v", t, id.Str()) // lookup pack packID, tpe, offset, length, err := r.idx.Lookup(id) @@ -97,6 +97,10 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { return nil, err } + if length > uint(cap(buf))+crypto.Extension { + return nil, errors.New("buf is too small") + } + if tpe != t { debug.Log("Repo.LoadBlob", "wrong type returned for %v: wanted %v, got %v", id.Str(), t, tpe) return nil, fmt.Errorf("blob has wrong type %v (wanted: %v)", tpe, t) @@ -111,7 +115,9 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { return nil, err } - buf, err := ioutil.ReadAll(rd) + // make buffer that is large enough for the complete blob + cbuf := make([]byte, length) + _, err = io.ReadFull(rd, cbuf) if err != nil { return nil, err } @@ -122,17 +128,17 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { } // decrypt - plain, err := r.Decrypt(buf) + buf, err = r.decryptTo(buf, cbuf) if err != nil { return nil, err } // check hash - if !backend.Hash(plain).Equal(id) { + if !backend.Hash(buf).Equal(id) { return nil, errors.New("invalid data returned") } - return plain, nil + return buf, nil } // LoadJSONUnpacked decrypts the data and afterwards calls json.Unmarshal on @@ -580,6 +586,12 @@ func (r *Repository) Init(password string) error { // Decrypt authenticates and decrypts ciphertext and returns the plaintext. func (r *Repository) Decrypt(ciphertext []byte) ([]byte, error) { + return r.decryptTo(nil, ciphertext) +} + +// decrypt authenticates and decrypts ciphertext and stores the result in +// plaintext. +func (r *Repository) decryptTo(plaintext, ciphertext []byte) ([]byte, error) { if r.key == nil { return nil, errors.New("key for repository not set") } diff --git a/repository/repository_test.go b/repository/repository_test.go index d978f83e4..f1012c90d 100644 --- a/repository/repository_test.go +++ b/repository/repository_test.go @@ -90,7 +90,7 @@ func TestSave(t *testing.T) { OK(t, repo.Flush()) // read back - buf, err := repo.LoadBlob(pack.Data, id) + buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size)) Assert(t, len(buf) == len(data), "number of bytes read back does not match: expected %d, got %d", @@ -120,7 +120,7 @@ func TestSaveFrom(t *testing.T) { OK(t, repo.Flush()) // read back - buf, err := repo.LoadBlob(pack.Data, id) + buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size)) Assert(t, len(buf) == len(data), "number of bytes read back does not match: expected %d, got %d", From bd746a0425b9a7fd2e0986bfcc904d09f3222c19 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 26 Jul 2015 16:43:42 +0200 Subject: [PATCH 2/4] fuse: refactor and add tests for fuse.file --- cmd/restic/fuse/file.go | 26 ++++-- cmd/restic/fuse/file_test.go | 157 +++++++++++++++++++++++++++++++++++ repository/repository.go | 5 ++ 3 files changed, 179 insertions(+), 9 deletions(-) create mode 100644 cmd/restic/fuse/file_test.go diff --git a/cmd/restic/fuse/file.go b/cmd/restic/fuse/file.go index 895c844aa..97e46543b 100644 --- a/cmd/restic/fuse/file.go +++ b/cmd/restic/fuse/file.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/restic/restic" + "github.com/restic/restic/backend" "github.com/restic/restic/pack" - "github.com/restic/restic/repository" "bazil.org/fuse" "bazil.org/fuse/fs" @@ -16,11 +16,18 @@ import ( var _ = fs.HandleReader(&file{}) var _ = fs.HandleReleaser(&file{}) +// BlobLoader is an abstracted repository with a reduced set of methods used +// for fuse operations. +type BlobLoader interface { + LookupBlobSize(backend.ID) (uint, error) + LoadBlob(pack.BlobType, backend.ID, []byte) ([]byte, error) +} + type file struct { - repo *repository.Repository + repo BlobLoader node *restic.Node - sizes []uint32 + sizes []uint blobs [][]byte } @@ -32,14 +39,15 @@ var blobPool = sync.Pool{ }, } -func newFile(repo *repository.Repository, node *restic.Node) (*file, error) { - sizes := make([]uint32, len(node.Content)) - for i, blobID := range node.Content { - length, err := repo.Index().LookupSize(blobID) +func newFile(repo BlobLoader, node *restic.Node) (*file, error) { + sizes := make([]uint, len(node.Content)) + for i, id := range node.Content { + size, err := repo.LookupBlobSize(id) if err != nil { return nil, err } - sizes[i] = uint32(length) + + sizes[i] = size } return &file{ @@ -65,7 +73,7 @@ func (f *file) getBlobAt(i int) (blob []byte, err error) { buf := blobPool.Get().([]byte) buf = buf[:cap(buf)] - if uint32(len(buf)) < f.sizes[i] { + if uint(len(buf)) < f.sizes[i] { if len(buf) > defaultBlobSize { blobPool.Put(buf) } diff --git a/cmd/restic/fuse/file_test.go b/cmd/restic/fuse/file_test.go new file mode 100644 index 000000000..25ea574c3 --- /dev/null +++ b/cmd/restic/fuse/file_test.go @@ -0,0 +1,157 @@ +package fuse + +import ( + "bytes" + "errors" + "fmt" + "math/rand" + "testing" + "time" + + "bazil.org/fuse" + + "github.com/restic/restic" + "github.com/restic/restic/backend" + "github.com/restic/restic/pack" + . "github.com/restic/restic/test" +) + +type MockRepo struct { + blobs map[backend.ID][]byte +} + +func NewMockRepo(content map[backend.ID][]byte) *MockRepo { + return &MockRepo{blobs: content} +} + +func (m *MockRepo) LookupBlobSize(id backend.ID) (uint, error) { + buf, ok := m.blobs[id] + if !ok { + return 0, errors.New("blob not found") + } + + return uint(len(buf)), nil +} + +func (m *MockRepo) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byte, error) { + size, err := m.LookupBlobSize(id) + if err != nil { + return nil, err + } + + if uint(cap(buf)) < size { + return nil, errors.New("buffer too small") + } + + buf = buf[:size] + copy(buf, m.blobs[id]) + return buf, nil +} + +type MockContext struct{} + +func (m MockContext) Deadline() (time.Time, bool) { return time.Now(), false } +func (m MockContext) Done() <-chan struct{} { return nil } +func (m MockContext) Err() error { return nil } +func (m MockContext) Value(key interface{}) interface{} { return nil } + +var testContent = genTestContent() +var testContentLengths = []uint{ + 4646 * 1024, + 655 * 1024, + 378 * 1024, + 8108 * 1024, + 558 * 1024, +} +var testMaxFileSize uint + +func genTestContent() map[backend.ID][]byte { + m := make(map[backend.ID][]byte) + + for _, length := range testContentLengths { + buf := Random(int(length), int(length)) + id := backend.Hash(buf) + m[id] = buf + testMaxFileSize += length + } + + return m +} + +const maxBufSize = 20 * 1024 * 1024 + +func testRead(t *testing.T, f *file, offset, length int, data []byte) []byte { + ctx := MockContext{} + + req := &fuse.ReadRequest{ + Offset: int64(offset), + Size: length, + } + resp := &fuse.ReadResponse{ + Data: make([]byte, length), + } + OK(t, f.Read(ctx, req, resp)) + + return resp.Data +} + +var offsetReadsTests = []struct { + offset, length int +}{ + {0, 5 * 1024 * 1024}, + {4000 * 1024, 1000 * 1024}, +} + +func TestFuseFile(t *testing.T) { + repo := NewMockRepo(testContent) + ctx := MockContext{} + + memfile := make([]byte, 0, maxBufSize) + + var ids backend.IDs + for id, buf := range repo.blobs { + ids = append(ids, id) + memfile = append(memfile, buf...) + } + + node := &restic.Node{ + Name: "foo", + Inode: 23, + Mode: 0742, + Size: 42, + Content: ids, + } + f, err := newFile(repo, node) + OK(t, err) + + attr := fuse.Attr{} + OK(t, f.Attr(ctx, &attr)) + + Equals(t, node.Inode, attr.Inode) + Equals(t, node.Mode, attr.Mode) + Equals(t, node.Size, attr.Size) + + for i, test := range offsetReadsTests { + b := memfile[test.offset : test.offset+test.length] + res := testRead(t, f, test.offset, test.length, b) + if !bytes.Equal(b, res) { + t.Errorf("test %d failed, wrong data returned", i) + } + } + + for i := 0; i < 200; i++ { + length := rand.Intn(int(testMaxFileSize) / 2) + offset := rand.Intn(int(testMaxFileSize)) + if length+offset > int(testMaxFileSize) { + diff := length + offset - int(testMaxFileSize) + length -= diff + } + + b := memfile[offset : offset+length] + fmt.Printf("test offset %d, length %d\n", offset, length) + res := testRead(t, f, offset, length, b) + if !bytes.Equal(b, res) { + t.Errorf("test %d failed (offset %d, length %d), wrong data returned", i, offset, length) + } + } +} diff --git a/repository/repository.go b/repository/repository.go index faef38f61..40f9cc7ec 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -201,6 +201,11 @@ func (r *Repository) LoadJSONPack(t pack.BlobType, id backend.ID, item interface return nil } +// LookupBlobSize returns the size of blob id. +func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) { + return r.Index().LookupSize(id) +} + const minPackSize = 4 * chunker.MiB const maxPackSize = 16 * chunker.MiB const maxPackers = 200 From d1629e1e4e1b176ee792212f327140d65c6528fa Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 26 Jul 2015 17:20:26 +0200 Subject: [PATCH 3/4] fuse: move to top level --- cmd/restic/cmd_mount.go | 2 +- {cmd/restic/fuse => fuse}/dir.go | 0 {cmd/restic/fuse => fuse}/file.go | 0 {cmd/restic/fuse => fuse}/file_test.go | 0 {cmd/restic/fuse => fuse}/fuse.go | 0 {cmd/restic/fuse => fuse}/link.go | 0 {cmd/restic/fuse => fuse}/snapshot.go | 1 + 7 files changed, 2 insertions(+), 1 deletion(-) rename {cmd/restic/fuse => fuse}/dir.go (100%) rename {cmd/restic/fuse => fuse}/file.go (100%) rename {cmd/restic/fuse => fuse}/file_test.go (100%) rename {cmd/restic/fuse => fuse}/fuse.go (100%) rename {cmd/restic/fuse => fuse}/link.go (100%) rename {cmd/restic/fuse => fuse}/snapshot.go (99%) diff --git a/cmd/restic/cmd_mount.go b/cmd/restic/cmd_mount.go index d685af98a..290ae4056 100644 --- a/cmd/restic/cmd_mount.go +++ b/cmd/restic/cmd_mount.go @@ -6,7 +6,7 @@ import ( "fmt" "os" - "github.com/restic/restic/cmd/restic/fuse" + "github.com/restic/restic/fuse" systemFuse "bazil.org/fuse" "bazil.org/fuse/fs" diff --git a/cmd/restic/fuse/dir.go b/fuse/dir.go similarity index 100% rename from cmd/restic/fuse/dir.go rename to fuse/dir.go diff --git a/cmd/restic/fuse/file.go b/fuse/file.go similarity index 100% rename from cmd/restic/fuse/file.go rename to fuse/file.go diff --git a/cmd/restic/fuse/file_test.go b/fuse/file_test.go similarity index 100% rename from cmd/restic/fuse/file_test.go rename to fuse/file_test.go diff --git a/cmd/restic/fuse/fuse.go b/fuse/fuse.go similarity index 100% rename from cmd/restic/fuse/fuse.go rename to fuse/fuse.go diff --git a/cmd/restic/fuse/link.go b/fuse/link.go similarity index 100% rename from cmd/restic/fuse/link.go rename to fuse/link.go diff --git a/cmd/restic/fuse/snapshot.go b/fuse/snapshot.go similarity index 99% rename from cmd/restic/fuse/snapshot.go rename to fuse/snapshot.go index f288615d1..6f82998c4 100644 --- a/cmd/restic/fuse/snapshot.go +++ b/fuse/snapshot.go @@ -60,6 +60,7 @@ func (sn *SnapshotsDir) updateCache(ctx context.Context) error { } return nil } + func (sn *SnapshotsDir) get(name string) (snapshot SnapshotWithId, ok bool) { sn.RLock() snapshot, ok = sn.knownSnapshots[name] From b85927576bccd00bae74391f4650d4dd9a2719b9 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 26 Jul 2015 18:00:53 +0200 Subject: [PATCH 4/4] Address code review comments --- cmd/restic/cmd_cat.go | 4 ++-- repository/repository.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 05f58c976..4e76f8237 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -162,12 +162,12 @@ func (cmd CmdCat) Execute(args []string) error { return err case "blob": - _, tpe, _, length, err := repo.Index().Lookup(id) + _, blobType, _, length, err := repo.Index().Lookup(id) if err != nil { return err } - if tpe != pack.Data { + if blobType != pack.Data { return errors.New("wrong type for blob") } diff --git a/repository/repository.go b/repository/repository.go index 40f9cc7ec..0a4630cfa 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -86,9 +86,9 @@ func (r *Repository) LoadAndDecrypt(t backend.Type, id backend.ID) ([]byte, erro } // LoadBlob tries to load and decrypt content identified by t and id from a -// pack from the backend, the result is stored in buf, which must be large -// enough to hold the complete blob. -func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byte, error) { +// pack from the backend, the result is stored in plaintextBuf, which must be +// large enough to hold the complete blob. +func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, plaintextBuf []byte) ([]byte, error) { debug.Log("Repo.LoadBlob", "load %v with id %v", t, id.Str()) // lookup pack packID, tpe, offset, length, err := r.idx.Lookup(id) @@ -97,8 +97,8 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byt return nil, err } - if length > uint(cap(buf))+crypto.Extension { - return nil, errors.New("buf is too small") + if length > uint(cap(plaintextBuf))+crypto.Extension { + return nil, fmt.Errorf("buf is too small, need %d more bytes", length-uint(cap(plaintextBuf))-crypto.Extension) } if tpe != t { @@ -116,8 +116,8 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byt } // make buffer that is large enough for the complete blob - cbuf := make([]byte, length) - _, err = io.ReadFull(rd, cbuf) + ciphertextBuf := make([]byte, length) + _, err = io.ReadFull(rd, ciphertextBuf) if err != nil { return nil, err } @@ -128,17 +128,17 @@ func (r *Repository) LoadBlob(t pack.BlobType, id backend.ID, buf []byte) ([]byt } // decrypt - buf, err = r.decryptTo(buf, cbuf) + plaintextBuf, err = r.decryptTo(plaintextBuf, ciphertextBuf) if err != nil { return nil, err } // check hash - if !backend.Hash(buf).Equal(id) { + if !backend.Hash(plaintextBuf).Equal(id) { return nil, errors.New("invalid data returned") } - return buf, nil + return plaintextBuf, nil } // LoadJSONUnpacked decrypts the data and afterwards calls json.Unmarshal on