From 2ce49ea0ee9c05a0f5db089098e465fe3227c1de Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 22 Feb 2016 21:09:21 +0100 Subject: [PATCH] Update code to use the new Chunker interface --- src/restic/archiver.go | 18 ++++---- src/restic/archiver_test.go | 55 ++++++++++--------------- src/restic/buffer_pool.go | 21 ++++++++++ src/restic/repository/packer_manager.go | 5 +-- 4 files changed, 53 insertions(+), 46 deletions(-) create mode 100644 src/restic/buffer_pool.go diff --git a/src/restic/archiver.go b/src/restic/archiver.go index d005bb8f4..9ff7ca10b 100644 --- a/src/restic/archiver.go +++ b/src/restic/archiver.go @@ -1,7 +1,7 @@ package restic import ( - "crypto/sha256" + "bytes" "encoding/json" "fmt" "io" @@ -11,13 +11,14 @@ import ( "sync" "time" - "github.com/restic/chunker" "restic/backend" "restic/debug" "restic/pack" "restic/pipe" "restic/repository" + "github.com/restic/chunker" + "github.com/juju/errors" ) @@ -154,12 +155,11 @@ type saveResult struct { bytes uint64 } -func (arch *Archiver) saveChunk(chunk *chunker.Chunk, p *Progress, token struct{}, file *os.File, resultChannel chan<- saveResult) { - hash := chunk.Digest - id := backend.ID{} - copy(id[:], hash) +func (arch *Archiver) saveChunk(chunk chunker.Chunk, p *Progress, token struct{}, file *os.File, resultChannel chan<- saveResult) { + defer freeBuf(chunk.Data) - err := arch.Save(pack.Data, id, chunk.Length, chunk.Reader(file)) + id := backend.Hash(chunk.Data) + err := arch.Save(pack.Data, id, chunk.Length, bytes.NewReader(chunk.Data)) // TODO handle error if err != nil { panic(err) @@ -220,11 +220,11 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) error { return err } - chnker := chunker.New(file, arch.repo.Config.ChunkerPolynomial, sha256.New()) + chnker := chunker.New(file, arch.repo.Config.ChunkerPolynomial) resultChannels := [](<-chan saveResult){} for { - chunk, err := chnker.Next() + chunk, err := chnker.Next(getBuf()) if err == io.EOF { break } diff --git a/src/restic/archiver_test.go b/src/restic/archiver_test.go index d38a73b80..813cc3362 100644 --- a/src/restic/archiver_test.go +++ b/src/restic/archiver_test.go @@ -2,12 +2,10 @@ package restic_test import ( "bytes" - "crypto/sha256" "io" "testing" "time" - "github.com/restic/chunker" "restic" "restic/backend" "restic/checker" @@ -15,6 +13,8 @@ import ( "restic/pack" "restic/repository" . "restic/test" + + "github.com/restic/chunker" ) var testPol = chunker.Pol(0x3DA3358B4DC173) @@ -24,17 +24,12 @@ type Rdr interface { io.ReaderAt } -type chunkedData struct { - buf []byte - chunks []*chunker.Chunk -} - func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { rd.Seek(0, 0) - ch := chunker.New(rd, testPol, sha256.New()) + ch := chunker.New(rd, testPol) for { - chunk, err := ch.Next() + chunk, err := ch.Next(buf) if err == io.EOF { break @@ -43,12 +38,10 @@ func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.K OK(b, err) // reduce length of buf - buf = buf[:chunk.Length] - n, err := io.ReadFull(chunk.Reader(rd), buf) - OK(b, err) - Assert(b, uint(n) == chunk.Length, "invalid length: got %d, expected %d", n, chunk.Length) + Assert(b, uint(len(chunk.Data)) == chunk.Length, + "invalid length: got %d, expected %d", len(chunk.Data), chunk.Length) - _, err = crypto.Encrypt(key, buf2, buf) + _, err = crypto.Encrypt(key, buf2, chunk.Data) OK(b, err) } } @@ -72,18 +65,16 @@ func BenchmarkChunkEncrypt(b *testing.B) { } func benchmarkChunkEncryptP(b *testing.PB, buf []byte, rd Rdr, key *crypto.Key) { - ch := chunker.New(rd, testPol, sha256.New()) + ch := chunker.New(rd, testPol) for { - chunk, err := ch.Next() + chunk, err := ch.Next(buf) if err == io.EOF { break } // reduce length of chunkBuf - buf = buf[:chunk.Length] - io.ReadFull(chunk.Reader(rd), buf) - crypto.Encrypt(key, buf, buf) + crypto.Encrypt(key, chunk.Data, chunk.Data) } } @@ -258,8 +249,7 @@ func testParallelSaveWithDuplication(t *testing.T, seed int) { duplication := 7 arch := restic.NewArchiver(repo) - data, chunks := getRandomData(seed, dataSizeMb*1024*1024) - reader := bytes.NewReader(data) + chunks := getRandomData(seed, dataSizeMb*1024*1024) errChannels := [](<-chan error){} @@ -272,18 +262,15 @@ func testParallelSaveWithDuplication(t *testing.T, seed int) { errChan := make(chan error) errChannels = append(errChannels, errChan) - go func(reader *bytes.Reader, c *chunker.Chunk, errChan chan<- error) { + go func(c chunker.Chunk, errChan chan<- error) { barrier <- struct{}{} - hash := c.Digest - id := backend.ID{} - copy(id[:], hash) - - time.Sleep(time.Duration(hash[0])) - err := arch.Save(pack.Data, id, c.Length, c.Reader(reader)) + id := backend.Hash(c.Data) + time.Sleep(time.Duration(id[0])) + err := arch.Save(pack.Data, id, c.Length, bytes.NewReader(c.Data)) <-barrier errChan <- err - }(reader, c, errChan) + }(c, errChan) } } @@ -298,20 +285,20 @@ func testParallelSaveWithDuplication(t *testing.T, seed int) { assertNoUnreferencedPacks(t, chkr) } -func getRandomData(seed int, size int) ([]byte, []*chunker.Chunk) { +func getRandomData(seed int, size int) []chunker.Chunk { buf := Random(seed, size) - chunks := []*chunker.Chunk{} - chunker := chunker.New(bytes.NewReader(buf), testPol, sha256.New()) + var chunks []chunker.Chunk + chunker := chunker.New(bytes.NewReader(buf), testPol) for { - c, err := chunker.Next() + c, err := chunker.Next(nil) if err == io.EOF { break } chunks = append(chunks, c) } - return buf, chunks + return chunks } func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker { diff --git a/src/restic/buffer_pool.go b/src/restic/buffer_pool.go new file mode 100644 index 000000000..25603bbfe --- /dev/null +++ b/src/restic/buffer_pool.go @@ -0,0 +1,21 @@ +package restic + +import ( + "sync" + + "github.com/restic/chunker" +) + +var bufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, chunker.MinSize) + }, +} + +func getBuf() []byte { + return bufPool.Get().([]byte) +} + +func freeBuf(data []byte) { + bufPool.Put(data) +} diff --git a/src/restic/repository/packer_manager.go b/src/restic/repository/packer_manager.go index 51a8ae888..a7716418e 100644 --- a/src/restic/repository/packer_manager.go +++ b/src/restic/repository/packer_manager.go @@ -3,7 +3,6 @@ package repository import ( "sync" - "github.com/restic/chunker" "restic/backend" "restic/crypto" "restic/debug" @@ -18,8 +17,8 @@ type packerManager struct { packs []*pack.Packer } -const minPackSize = 4 * chunker.MiB -const maxPackSize = 16 * chunker.MiB +const minPackSize = 4 * 1024 * 1024 +const maxPackSize = 16 * 1024 * 1024 const maxPackers = 200 // findPacker returns a packer for a new blob of size bytes. Either a new one is