PackerManager: use tempfiles instead of memory buffers

This commit is contained in:
Alexander Neumann 2016-03-06 12:26:25 +01:00
parent f893ec57cb
commit f956f60f9f
6 changed files with 170 additions and 55 deletions

View File

@ -83,26 +83,26 @@ type Packer struct {
bytes uint bytes uint
k *crypto.Key k *crypto.Key
buf *bytes.Buffer wr io.Writer
m sync.Mutex m sync.Mutex
} }
// NewPacker returns a new Packer that can be used to pack blobs // NewPacker returns a new Packer that can be used to pack blobs
// together. // together. If wr is nil, a bytes.Buffer is used.
func NewPacker(k *crypto.Key, buf []byte) *Packer { func NewPacker(k *crypto.Key, wr io.Writer) *Packer {
return &Packer{k: k, buf: bytes.NewBuffer(buf)} return &Packer{k: k, wr: wr}
} }
// Add saves the data read from rd as a new blob to the packer. Returned is the // Add saves the data read from rd as a new blob to the packer. Returned is the
// number of bytes written to the pack. // number of bytes written to the pack.
func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) { func (p *Packer) Add(t BlobType, id backend.ID, data []byte) (int, error) {
p.m.Lock() p.m.Lock()
defer p.m.Unlock() defer p.m.Unlock()
c := Blob{Type: t, ID: id} c := Blob{Type: t, ID: id}
n, err := io.Copy(p.buf, rd) n, err := p.wr.Write(data)
c.Length = uint(n) c.Length = uint(n)
c.Offset = p.bytes c.Offset = p.bytes
p.bytes += uint(n) p.bytes += uint(n)
@ -121,8 +121,9 @@ type headerEntry struct {
} }
// Finalize writes the header for all added blobs and finalizes the pack. // Finalize writes the header for all added blobs and finalizes the pack.
// Returned are all bytes written, including the header. // Returned are the number of bytes written, including the header. If the
func (p *Packer) Finalize() ([]byte, error) { // underlying writer implements io.Closer, it is closed.
func (p *Packer) Finalize() (uint, error) {
p.m.Lock() p.m.Lock()
defer p.m.Unlock() defer p.m.Unlock()
@ -131,37 +132,41 @@ func (p *Packer) Finalize() ([]byte, error) {
hdrBuf := bytes.NewBuffer(nil) hdrBuf := bytes.NewBuffer(nil)
bytesHeader, err := p.writeHeader(hdrBuf) bytesHeader, err := p.writeHeader(hdrBuf)
if err != nil { if err != nil {
return nil, err return 0, err
} }
encryptedHeader, err := crypto.Encrypt(p.k, nil, hdrBuf.Bytes()) encryptedHeader, err := crypto.Encrypt(p.k, nil, hdrBuf.Bytes())
if err != nil { if err != nil {
return nil, err return 0, err
} }
// append the header // append the header
n, err := p.buf.Write(encryptedHeader) n, err := p.wr.Write(encryptedHeader)
if err != nil { if err != nil {
return nil, err return 0, err
} }
hdrBytes := bytesHeader + crypto.Extension hdrBytes := bytesHeader + crypto.Extension
if uint(n) != hdrBytes { if uint(n) != hdrBytes {
return nil, errors.New("wrong number of bytes written") return 0, errors.New("wrong number of bytes written")
} }
bytesWritten += hdrBytes bytesWritten += hdrBytes
// write length // write length
err = binary.Write(p.buf, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) err = binary.Write(p.wr, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension))
if err != nil { if err != nil {
return nil, err return 0, err
} }
bytesWritten += uint(binary.Size(uint32(0))) bytesWritten += uint(binary.Size(uint32(0)))
p.bytes = uint(bytesWritten) p.bytes = uint(bytesWritten)
return p.buf.Bytes(), nil if w, ok := p.wr.(io.Closer); ok {
return bytesWritten, w.Close()
}
return bytesWritten, nil
} }
// writeHeader constructs and writes the header to wr. // writeHeader constructs and writes the header to wr.
@ -208,6 +213,11 @@ func (p *Packer) Blobs() []Blob {
return p.blobs return p.blobs
} }
// Writer return the underlying writer.
func (p *Packer) Writer() io.Writer {
return p.wr
}
func (p *Packer) String() string { func (p *Packer) String() string {
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes) return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
} }

View File

@ -1,6 +1,10 @@
package repository package repository
import ( import (
"fmt"
"io"
"io/ioutil"
"os"
"sync" "sync"
"restic/backend" "restic/backend"
@ -11,7 +15,7 @@ import (
// Saver implements saving data in a backend. // Saver implements saving data in a backend.
type Saver interface { type Saver interface {
Save(h backend.Handle, p []byte) error Save(h backend.Handle, jp []byte) error
} }
// packerManager keeps a list of open packs and creates new on demand. // packerManager keeps a list of open packs and creates new on demand.
@ -20,12 +24,30 @@ type packerManager struct {
key *crypto.Key key *crypto.Key
pm sync.Mutex pm sync.Mutex
packs []*pack.Packer packs []*pack.Packer
tempdir string
} }
const minPackSize = 4 * 1024 * 1024 const minPackSize = 4 * 1024 * 1024
const maxPackSize = 16 * 1024 * 1024 const maxPackSize = 16 * 1024 * 1024
const maxPackers = 200 const maxPackers = 200
// NewPackerManager returns an new packer manager which writes temporary files
// to a temporary directory
func NewPackerManager(be Saver, key *crypto.Key) (pm *packerManager, err error) {
pm = &packerManager{
be: be,
key: key,
}
pm.tempdir, err = ioutil.TempDir("", fmt.Sprintf("restic-packs-%d-", os.Getpid()))
if err != nil {
return nil, err
}
return pm, nil
}
// findPacker returns a packer for a new blob of size bytes. Either a new one is // findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs. // created or one is returned that already has some blobs.
func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { func (r *packerManager) findPacker(size uint) (*pack.Packer, error) {
@ -47,7 +69,13 @@ func (r *packerManager) findPacker(size uint) (*pack.Packer, error) {
// no suitable packer found, return new // no suitable packer found, return new
debug.Log("Repo.findPacker", "create new pack for %d bytes", size) debug.Log("Repo.findPacker", "create new pack for %d bytes", size)
return pack.NewPacker(r.key, nil), nil tmpfile, err := ioutil.TempFile(r.tempdir, "restic-pack-")
if err != nil {
return nil, err
}
fmt.Printf("tmpfile: %v, tempdir %v\n", tmpfile.Name(), r.tempdir)
return pack.NewPacker(r.key, tmpfile), nil
} }
// insertPacker appends p to s.packs. // insertPacker appends p to s.packs.
@ -62,11 +90,28 @@ func (r *packerManager) insertPacker(p *pack.Packer) {
// savePacker stores p in the backend. // savePacker stores p in the backend.
func (r *Repository) savePacker(p *pack.Packer) error { func (r *Repository) savePacker(p *pack.Packer) error {
debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count()) debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count())
data, err := p.Finalize() n, err := p.Finalize()
if err != nil { if err != nil {
return err return err
} }
tmpfile := p.Writer().(*os.File)
f, err := os.Open(tmpfile.Name())
if err != nil {
return err
}
data := make([]byte, n)
m, err := io.ReadFull(f, data)
if uint(m) != n {
return fmt.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m)
}
if err = f.Close(); err != nil {
return err
}
id := backend.Hash(data) id := backend.Hash(data)
h := backend.Handle{Type: backend.Data, Name: id.String()} h := backend.Handle{Type: backend.Data, Name: id.String()}
@ -100,3 +145,10 @@ func (r *packerManager) countPacker() int {
return len(r.packs) return len(r.packs)
} }
// removeTempdir deletes the temporary directory.
func (r *packerManager) removeTempdir() error {
err := os.RemoveAll(r.tempdir)
r.tempdir = ""
return err
}

View File

@ -3,6 +3,7 @@ package repository
import ( import (
"io" "io"
"math/rand" "math/rand"
"os"
"restic/backend" "restic/backend"
"restic/backend/mem" "restic/backend/mem"
"restic/crypto" "restic/crypto"
@ -19,7 +20,39 @@ func randomID(rd io.Reader) backend.ID {
return id return id
} }
func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes int) { const maxBlobSize = 1 << 20
func saveFile(t testing.TB, be Saver, filename string, n int) {
f, err := os.Open(filename)
if err != nil {
t.Fatal(err)
}
data := make([]byte, n)
m, err := io.ReadFull(f, data)
if m != n {
t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n)
}
if err = f.Close(); err != nil {
t.Fatal(err)
}
h := backend.Handle{Type: backend.Data, Name: backend.Hash(data).String()}
err = be.Save(h, data)
if err != nil {
t.Fatal(err)
}
err = os.Remove(filename)
if err != nil {
t.Fatal(err)
}
}
func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
l := rnd.Intn(1 << 20) l := rnd.Intn(1 << 20)
seed := rnd.Int63() seed := rnd.Int63()
@ -31,9 +64,14 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes
rd := rand.New(rand.NewSource(seed)) rd := rand.New(rand.NewSource(seed))
id := randomID(rd) id := randomID(rd)
n, err := packer.Add(pack.Data, id, io.LimitReader(rd, int64(l))) buf = buf[:l]
_, err = io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if n != int64(l) { n, err := packer.Add(pack.Data, id, buf)
if n != l {
t.Errorf("Add() returned invalid number of bytes: want %v, got %v", n, l) t.Errorf("Add() returned invalid number of bytes: want %v, got %v", n, l)
} }
bytes += l bytes += l
@ -43,17 +81,13 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes
continue continue
} }
data, err := packer.Finalize() bytesWritten, err := packer.Finalize()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
h := backend.Handle{Type: backend.Data, Name: randomID(rd).String()} tmpfile := packer.Writer().(*os.File)
saveFile(t, be, tmpfile.Name(), int(bytesWritten))
err = be.Save(h, data)
if err != nil {
t.Fatal(err)
}
} }
return bytes return bytes
@ -62,18 +96,14 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes
func flushRemainingPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes int) { func flushRemainingPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes int) {
if pm.countPacker() > 0 { if pm.countPacker() > 0 {
for _, packer := range pm.packs { for _, packer := range pm.packs {
data, err := packer.Finalize() n, err := packer.Finalize()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
bytes += len(data) bytes += int(n)
h := backend.Handle{Type: backend.Data, Name: randomID(rnd).String()} tmpfile := packer.Writer().(*os.File)
saveFile(t, be, tmpfile.Name(), bytes)
err = be.Save(h, data)
if err != nil {
t.Fatal(err)
}
} }
} }
@ -90,33 +120,45 @@ func TestPackerManager(t *testing.T) {
rnd := rand.New(rand.NewSource(23)) rnd := rand.New(rand.NewSource(23))
be := mem.New() be := mem.New()
pm := &packerManager{ pm, err := NewPackerManager(be, crypto.NewRandomKey())
be: be, if err != nil {
key: crypto.NewRandomKey(), t.Fatal(err)
} }
bytes := fillPacks(t, rnd, be, pm) blobBuf := make([]byte, maxBlobSize)
bytes := fillPacks(t, rnd, be, pm, blobBuf)
bytes += flushRemainingPacks(t, rnd, be, pm) bytes += flushRemainingPacks(t, rnd, be, pm)
t.Logf("saved %d bytes", bytes) t.Logf("saved %d bytes", bytes)
err = pm.removeTempdir()
if err != nil {
t.Fatal(err)
}
} }
func BenchmarkPackerManager(t *testing.B) { func BenchmarkPackerManager(t *testing.B) {
rnd := rand.New(rand.NewSource(23)) rnd := rand.New(rand.NewSource(23))
be := &fakeBackend{} be := &fakeBackend{}
pm := &packerManager{ pm, err := NewPackerManager(be, crypto.NewRandomKey())
be: be, if err != nil {
key: crypto.NewRandomKey(), t.Fatal(err)
} }
blobBuf := make([]byte, maxBlobSize)
t.ResetTimer() t.ResetTimer()
bytes := 0 bytes := 0
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
bytes += fillPacks(t, rnd, be, pm) bytes += fillPacks(t, rnd, be, pm, blobBuf)
} }
bytes += flushRemainingPacks(t, rnd, be, pm) bytes += flushRemainingPacks(t, rnd, be, pm)
t.Logf("saved %d bytes", bytes) t.Logf("saved %d bytes", bytes)
err = pm.removeTempdir()
if err != nil {
t.Fatal(err)
}
} }

View File

@ -27,14 +27,19 @@ type Repository struct {
} }
// New returns a new repository with backend be. // New returns a new repository with backend be.
func New(be backend.Backend) *Repository { func New(be backend.Backend) (*Repository, error) {
return &Repository{ pm, err := NewPackerManager(be, nil)
be: be, if err != nil {
idx: NewMasterIndex(), return nil, err
packerManager: &packerManager{
be: be,
},
} }
repo := &Repository{
be: be,
idx: NewMasterIndex(),
packerManager: pm,
}
return repo, nil
} }
// Find loads the list of all blobs of type t and searches for names which start // Find loads the list of all blobs of type t and searches for names which start
@ -195,7 +200,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
} }
// save ciphertext // save ciphertext
_, err = packer.Add(t, *id, bytes.NewReader(ciphertext)) _, err = packer.Add(t, *id, ciphertext)
if err != nil { if err != nil {
return backend.ID{}, err return backend.ID{}, err
} }

View File

@ -61,7 +61,10 @@ func SetupRepo() *repository.Repository {
panic(err) panic(err)
} }
repo := repository.New(b) repo, err := repository.New(b)
if err != nil {
panic(err)
}
err = repo.Init(TestPassword) err = repo.Init(TestPassword)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -213,7 +213,10 @@ func OpenLocalRepo(t testing.TB, dir string) *repository.Repository {
be, err := local.Open(dir) be, err := local.Open(dir)
OK(t, err) OK(t, err)
repo := repository.New(be) repo, err := repository.New(be)
if err != nil {
t.Fatal(err)
}
err = repo.SearchKey(TestPassword) err = repo.SearchKey(TestPassword)
OK(t, err) OK(t, err)