Compare commits

...

8 Commits

Author SHA1 Message Date
Michael Eischer 2ab863a0f4
Merge 20d8eed400 into faffd15d13 2024-04-24 23:58:42 +00:00
Michael Eischer faffd15d13
Merge pull request #4734 from maouw/enhancement/envvar-for-host
Add support for specifying --host via environment variable
2024-04-24 20:00:15 +00:00
Michael Eischer 347e9d0765 complete RESITC_HOST environment handling & test 2024-04-24 21:52:39 +02:00
Altan Orhon 871ea1eaf3 Add support for specifying --host via environment variable
This commit adds support for specifying the `--host` option via the `RESTIC_HOST` environment variable. This is done by extending option processing in `cmd_backup.go` and for `restic.SnapshotFilter` in `find.go`.
2024-04-24 21:49:42 +02:00
Michael Eischer 20d8eed400 repository: streamPack: separate requests for gap larger than 1MB
With most cloud providers, traffic is much more expensive than API
calls. Thus slightly bias streamPack towards a bit more API calls in
exchange for slightly less traffic.
2024-04-22 21:21:23 +02:00
Michael Eischer cf700d8794 repository: streamPack: reuse zstd decoder 2024-04-22 21:21:23 +02:00
Michael Eischer 666a0b0bdb repository: streamPack: replace streaming with chunked download
Due to the interface of streamPack, we cannot guarantee that operations
progress fast enough that the underlying connections remains open. This
introduces partial failures which massively complicate the error
handling.

Switch to a simpler approach that retrieves the pack in chunks of 32MB.
If a blob is larger than this limit, then it is downloaded separately.

To avoid multiple copies in memory, an auxiliary interface
`discardReader` is introduced that allows directly accessing the
downloaded byte slices, while still supporting the streaming used by the
`check` command.
2024-04-22 21:21:23 +02:00
Michael Eischer 621012dac0 repository: Add blob loading fallback to LoadBlobsFromPack
Try to retrieve individual blobs via LoadBlob if streaming did not work.
2024-04-21 21:35:55 +02:00
8 changed files with 330 additions and 85 deletions

View File

@ -0,0 +1,10 @@
Enhancement: Allow specifying `--host` via environment variable
Restic commands that operate on snapshots, such as `restic backup` and
`restic snapshots`, support the `--host` flag to specify the hostname for
grouoping snapshots. They now permit selecting the hostname via the
environment variable `RESTIC_HOST`. `--host` still takes precedence over the
environment variable.
https://github.com/restic/restic/issues/4733
https://github.com/restic/restic/pull/4734

View File

@ -114,7 +114,7 @@ func init() {
f.BoolVar(&backupOptions.StdinCommand, "stdin-from-command", false, "interpret arguments as command to execute and store its stdout")
f.Var(&backupOptions.Tags, "tag", "add `tags` for the new snapshot in the format `tag[,tag,...]` (can be specified multiple times)")
f.UintVar(&backupOptions.ReadConcurrency, "read-concurrency", 0, "read `n` files concurrently (default: $RESTIC_READ_CONCURRENCY or 2)")
f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually. To prevent an expensive rescan use the \"parent\" flag")
f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually (default: $RESTIC_HOST). To prevent an expensive rescan use the \"parent\" flag")
f.StringVar(&backupOptions.Host, "hostname", "", "set the `hostname` for the snapshot manually")
err := f.MarkDeprecated("hostname", "use --host")
if err != nil {
@ -137,6 +137,11 @@ func init() {
// parse read concurrency from env, on error the default value will be used
readConcurrency, _ := strconv.ParseUint(os.Getenv("RESTIC_READ_CONCURRENCY"), 10, 32)
backupOptions.ReadConcurrency = uint(readConcurrency)
// parse host from env, if not exists or empty the default value will be used
if host := os.Getenv("RESTIC_HOST"); host != "" {
backupOptions.Host = host
}
}
// filterExisting returns a slice of all existing items, or an error if no

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"os"
"github.com/restic/restic/internal/restic"
"github.com/spf13/pflag"
@ -14,17 +15,27 @@ func initMultiSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter,
if !addHostShorthand {
hostShorthand = ""
}
flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times)")
flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times) (default: $RESTIC_HOST)")
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]` (can be specified multiple times)")
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path` (can be specified multiple times)")
// set default based on env if set
if host := os.Getenv("RESTIC_HOST"); host != "" {
filt.Hosts = []string{host}
}
}
// initSingleSnapshotFilter is used for commands that work on a single snapshot
// MUST be combined with restic.FindFilteredSnapshot
func initSingleSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter) {
flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times)")
flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times) (default: $RESTIC_HOST)")
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]`, when snapshot ID \"latest\" is given (can be specified multiple times)")
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path`, when snapshot ID \"latest\" is given (can be specified multiple times)")
// set default based on env if set
if host := os.Getenv("RESTIC_HOST"); host != "" {
filt.Hosts = []string{host}
}
}
// FindFilteredSnapshots yields Snapshots, either given explicitly by `snapshotIDs` or filtered from the list of all snapshots.

61
cmd/restic/find_test.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/spf13/pflag"
)
func TestSnapshotFilter(t *testing.T) {
for _, test := range []struct {
name string
args []string
expected []string
env string
}{
{
"no value",
[]string{},
nil,
"",
},
{
"args only",
[]string{"--host", "abc"},
[]string{"abc"},
"",
},
{
"env default",
[]string{},
[]string{"def"},
"def",
},
{
"both",
[]string{"--host", "abc"},
[]string{"abc"},
"def",
},
} {
t.Run(test.name, func(t *testing.T) {
t.Setenv("RESTIC_HOST", test.env)
for _, mode := range []bool{false, true} {
set := pflag.NewFlagSet("test", pflag.PanicOnError)
flt := &restic.SnapshotFilter{}
if mode {
initMultiSnapshotFilter(set, flt, false)
} else {
initSingleSnapshotFilter(set, flt)
}
err := set.Parse(test.args)
rtest.OK(t, err)
rtest.Equals(t, test.expected, flt.Hosts, "unexpected hosts")
}
})
}
}

View File

@ -567,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
@ -653,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil
}
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
@ -675,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)

View File

@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
// a required blob couldn't be retrieved
return err
}
keepMutex.Lock()

View File

@ -1,7 +1,6 @@
package repository
import (
"bufio"
"bytes"
"context"
"fmt"
@ -12,7 +11,6 @@ import (
"sort"
"sync"
"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd"
"github.com/restic/chunker"
"github.com/restic/restic/internal/backend"
@ -29,8 +27,6 @@ import (
"golang.org/x/sync/errgroup"
)
const MaxStreamBufferSize = 4 * 1024 * 1024
const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024
@ -966,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
}
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// Skip sections with more than 1MB unused blobs
const maxUnusedRange = 1 * 1024 * 1024
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it.
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
}
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
@ -990,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lowerIdx := 0
lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize
for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID)
}
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}
if split {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
@ -1006,10 +1019,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
}
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
dataStart := blobs[0].Offset
@ -1017,57 +1030,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return nil
data := make([]byte, int(dataEnd-dataStart))
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
_, cerr := io.ReadFull(rd, data)
return cerr
})
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
// the context is only still valid if handleBlobFn never returned an error
if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}
return errors.Wrap(err, "StreamPack")
}
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return errors.Wrap(err, "StreamPack")
}
// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}
type byteReader struct {
buf []byte
}
func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}
func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}
type PackBlobIterator struct {
packID restic.ID
rd *bufio.Reader
rd discardReader
currentOffset uint
blobs []restic.Blob
key *crypto.Key
dec *zstd.Decoder
buf []byte
decode []byte
}
@ -1079,7 +1143,7 @@ type PackBlobValue struct {
var ErrPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint,
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{
packID: packID,
@ -1114,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(b.buf)) < entry.Length {
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]
n, err := io.ReadFull(b.rd, b.buf)
buf, err := b.rd.ReadFull(int(entry.Length))
if err != nil {
debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
}
if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() {
@ -1137,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
}
// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():]
nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)

View File

@ -146,14 +146,14 @@ func TestStreamPack(t *testing.T) {
}
func testStreamPack(t *testing.T, version uint) {
// always use the same key for deterministic output
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
dec, err := zstd.NewReader(nil)
if err != nil {
t.Fatal(err)
panic(dec)
}
defer dec.Close()
// always use the same key for deterministic output
key := testKey(t)
blobSizes := []int{
5522811,
@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
loadCalls = 0
shortFirstLoad = test.shortFirstLoad
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
return err
}
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}
@ -449,3 +449,83 @@ func TestUnpackedVerification(t *testing.T) {
}
}
}
func testKey(t *testing.T) crypto.Key {
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
return key
}
func TestStreamPackFallback(t *testing.T) {
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
test := func(t *testing.T, failLoad bool) {
key := testKey(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
plaintext := rtest.Random(800, 42)
blobID := restic.Hash(plaintext)
blobs := []restic.Blob{
{
Length: uint(crypto.CiphertextLength(len(plaintext))),
Offset: 0,
BlobHandle: restic.BlobHandle{
ID: blobID,
Type: restic.DataBlob,
},
},
}
var loadPack backendLoadFn
if failLoad {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return errors.New("load error")
}
} else {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// just return an empty array to provoke an error
data := make([]byte, length)
return fn(bytes.NewReader(data))
}
}
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == blobID {
return plaintext, nil
}
return nil, errors.New("unknown blob")
}
blobOK := false
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
rtest.OK(t, err)
rtest.Equals(t, blobID, blob.ID)
rtest.Equals(t, plaintext, buf)
blobOK = true
return err
}
err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob)
rtest.OK(t, err)
rtest.Assert(t, blobOK, "blob failed to load")
}
t.Run("corrupted blob", func(t *testing.T) {
test(t, false)
})
// test fallback for failed pack loading
t.Run("failed load", func(t *testing.T) {
test(t, true)
})
}