From 3e0acf13956c63bae371873b19504a726152c264 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Wed, 18 Nov 2020 12:36:06 +0100 Subject: [PATCH] restore: Don't save (part of) pack in memory --- internal/restorer/filerestorer.go | 121 +++++++++++++++++------------- 1 file changed, 67 insertions(+), 54 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 709301d82..6a2dfbd20 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,11 +1,12 @@ package restorer import ( - "bytes" + "bufio" "context" "io" "math" "path/filepath" + "sort" "sync" "github.com/restic/restic/internal/crypto" @@ -179,6 +180,8 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return nil } +const maxBufferSize = 4 * 1024 * 1024 + func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { // calculate pack byte range and blob->[]files->[]offsets mappings @@ -226,18 +229,12 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { } } - packData := make([]byte, int(end-start)) - - h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()} - err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { - l, err := io.ReadFull(rd, packData) - if err != nil { - return err - } - if l != len(packData) { - return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l) - } - return nil + sortedBlobs := make([]restic.ID, 0, len(blobs)) + for blobID := range blobs { + sortedBlobs = append(sortedBlobs, blobID) + } + sort.Slice(sortedBlobs, func(i, j int) bool { + return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset }) markFileError := func(file *fileInfo, err error) { @@ -248,6 +245,61 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { } } + h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()} + err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { + bufferSize := int(end - start) + if bufferSize > maxBufferSize { + bufferSize = maxBufferSize + } + BufRd := bufio.NewReaderSize(rd, bufferSize) + currentBlobEnd := start + for _, blobID := range sortedBlobs { + blob := blobs[blobID] + _, err := BufRd.Discard(int(blob.offset - currentBlobEnd)) + if err != nil { + return err + } + blobData, err := r.loadBlob(BufRd, blobID, blob.length) + if err != nil { + for file := range blob.files { + markFileError(file, err) + } + continue + } + currentBlobEnd = blob.offset + int64(blob.length) + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + file.lock.Lock() + create := file.flags&fileProgress == 0 + createSize := int64(-1) + if create { + defer file.lock.Unlock() + file.flags |= fileProgress + createSize = file.size + } else { + file.lock.Unlock() + } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) + } + err := writeToFile() + if err != nil { + markFileError(file, err) + break + } + } + } + } + + return nil + }) + if err != nil { for file := range pack.files { markFileError(file, err) @@ -255,53 +307,14 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { return } - rd := bytes.NewReader(packData) - - for blobID, blob := range blobs { - blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length) - if err != nil { - for file := range blob.files { - markFileError(file, err) - } - continue - } - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - file.lock.Lock() - create := file.flags&fileProgress == 0 - createSize := int64(-1) - if create { - defer file.lock.Unlock() - file.flags |= fileProgress - createSize = file.size - } else { - file.lock.Unlock() - } - return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) - } - err := writeToFile() - if err != nil { - markFileError(file, err) - break - } - } - } - } } -func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) { +func (r *fileRestorer) loadBlob(rd io.Reader, blobID restic.ID, length int) ([]byte, error) { // TODO reconcile with Repository#loadBlob implementation buf := make([]byte, length) - n, err := rd.ReadAt(buf, offset) + n, err := rd.Read(buf) if err != nil { return nil, err }