repository: Add StreamPacks function

The function supports efficiently loading a specified list of blobs from
a single pack in a streaming fashion. That is there's no need for
temporary files independent of the pack size.
This commit is contained in:
Michael Eischer 2021-08-20 23:21:05 +02:00
parent 153e2ba859
commit c4a2bfcb39
1 changed files with 85 additions and 0 deletions

View File

@ -1,12 +1,14 @@
package repository
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"sync"
"github.com/restic/chunker"
@ -25,6 +27,8 @@ import (
"golang.org/x/sync/errgroup"
)
const maxStreamBufferSize = 4 * 1024 * 1024
// Repository is used to access a repository in a backend.
type Repository struct {
be restic.Backend
@ -782,3 +786,84 @@ func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *
return tmpfile, hash, size, err
}
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
}
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
dataStart := blobs[0].Offset
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
// stream blobs in pack
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
bufferSize := int(dataEnd - dataStart)
if bufferSize > maxStreamBufferSize {
bufferSize = maxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := dataStart
var buf []byte
for _, entry := range blobs {
skipBytes := int(entry.Offset - currentBlobEnd)
if skipBytes < 0 {
return errors.Errorf("overlapping blobs in pack %v", packID)
}
_, err := bufRd.Discard(skipBytes)
if err != nil {
return err
}
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]
n, err := io.ReadFull(bufRd, buf)
if err != nil {
debug.Log(" read error %v", err)
return errors.Wrap(err, "ReadFull")
}
if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, packID.Str(), len(buf), n)
}
currentBlobEnd = entry.Offset + entry.Length
// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():]
plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err == nil {
id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, packID.Str(), id)
err = errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, packID.Str(), id)
}
}
err = handleBlobFn(entry.BlobHandle, plaintext, err)
if err != nil {
return err
}
}
return nil
})
return errors.Wrap(err, "StreamPack")
}