Compare commits

...

8 Commits

Author SHA1 Message Date
Michael Eischer 2ab863a0f4
Merge 20d8eed400 into faffd15d13 2024-04-24 23:58:42 +00:00
Michael Eischer faffd15d13
Merge pull request #4734 from maouw/enhancement/envvar-for-host
Add support for specifying --host via environment variable
2024-04-24 20:00:15 +00:00
Michael Eischer 347e9d0765 complete RESITC_HOST environment handling & test 2024-04-24 21:52:39 +02:00
Altan Orhon 871ea1eaf3 Add support for specifying --host via environment variable
This commit adds support for specifying the `--host` option via the `RESTIC_HOST` environment variable. This is done by extending option processing in `cmd_backup.go` and for `restic.SnapshotFilter` in `find.go`.
2024-04-24 21:49:42 +02:00
Michael Eischer 20d8eed400 repository: streamPack: separate requests for gap larger than 1MB
With most cloud providers, traffic is much more expensive than API
calls. Thus slightly bias streamPack towards a bit more API calls in
exchange for slightly less traffic.
2024-04-22 21:21:23 +02:00
Michael Eischer cf700d8794 repository: streamPack: reuse zstd decoder 2024-04-22 21:21:23 +02:00
Michael Eischer 666a0b0bdb repository: streamPack: replace streaming with chunked download
Due to the interface of streamPack, we cannot guarantee that operations
progress fast enough that the underlying connections remains open. This
introduces partial failures which massively complicate the error
handling.

Switch to a simpler approach that retrieves the pack in chunks of 32MB.
If a blob is larger than this limit, then it is downloaded separately.

To avoid multiple copies in memory, an auxiliary interface
`discardReader` is introduced that allows directly accessing the
downloaded byte slices, while still supporting the streaming used by the
`check` command.
2024-04-22 21:21:23 +02:00
Michael Eischer 621012dac0 repository: Add blob loading fallback to LoadBlobsFromPack
Try to retrieve individual blobs via LoadBlob if streaming did not work.
2024-04-21 21:35:55 +02:00
8 changed files with 330 additions and 85 deletions

View File

@ -0,0 +1,10 @@
Enhancement: Allow specifying `--host` via environment variable
Restic commands that operate on snapshots, such as `restic backup` and
`restic snapshots`, support the `--host` flag to specify the hostname for
grouoping snapshots. They now permit selecting the hostname via the
environment variable `RESTIC_HOST`. `--host` still takes precedence over the
environment variable.
https://github.com/restic/restic/issues/4733
https://github.com/restic/restic/pull/4734

View File

@ -114,7 +114,7 @@ func init() {
f.BoolVar(&backupOptions.StdinCommand, "stdin-from-command", false, "interpret arguments as command to execute and store its stdout") f.BoolVar(&backupOptions.StdinCommand, "stdin-from-command", false, "interpret arguments as command to execute and store its stdout")
f.Var(&backupOptions.Tags, "tag", "add `tags` for the new snapshot in the format `tag[,tag,...]` (can be specified multiple times)") f.Var(&backupOptions.Tags, "tag", "add `tags` for the new snapshot in the format `tag[,tag,...]` (can be specified multiple times)")
f.UintVar(&backupOptions.ReadConcurrency, "read-concurrency", 0, "read `n` files concurrently (default: $RESTIC_READ_CONCURRENCY or 2)") f.UintVar(&backupOptions.ReadConcurrency, "read-concurrency", 0, "read `n` files concurrently (default: $RESTIC_READ_CONCURRENCY or 2)")
f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually. To prevent an expensive rescan use the \"parent\" flag") f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually (default: $RESTIC_HOST). To prevent an expensive rescan use the \"parent\" flag")
f.StringVar(&backupOptions.Host, "hostname", "", "set the `hostname` for the snapshot manually") f.StringVar(&backupOptions.Host, "hostname", "", "set the `hostname` for the snapshot manually")
err := f.MarkDeprecated("hostname", "use --host") err := f.MarkDeprecated("hostname", "use --host")
if err != nil { if err != nil {
@ -137,6 +137,11 @@ func init() {
// parse read concurrency from env, on error the default value will be used // parse read concurrency from env, on error the default value will be used
readConcurrency, _ := strconv.ParseUint(os.Getenv("RESTIC_READ_CONCURRENCY"), 10, 32) readConcurrency, _ := strconv.ParseUint(os.Getenv("RESTIC_READ_CONCURRENCY"), 10, 32)
backupOptions.ReadConcurrency = uint(readConcurrency) backupOptions.ReadConcurrency = uint(readConcurrency)
// parse host from env, if not exists or empty the default value will be used
if host := os.Getenv("RESTIC_HOST"); host != "" {
backupOptions.Host = host
}
} }
// filterExisting returns a slice of all existing items, or an error if no // filterExisting returns a slice of all existing items, or an error if no

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"os"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/spf13/pflag" "github.com/spf13/pflag"
@ -14,17 +15,27 @@ func initMultiSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter,
if !addHostShorthand { if !addHostShorthand {
hostShorthand = "" hostShorthand = ""
} }
flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times)") flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times) (default: $RESTIC_HOST)")
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]` (can be specified multiple times)") flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]` (can be specified multiple times)")
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path` (can be specified multiple times)") flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path` (can be specified multiple times)")
// set default based on env if set
if host := os.Getenv("RESTIC_HOST"); host != "" {
filt.Hosts = []string{host}
}
} }
// initSingleSnapshotFilter is used for commands that work on a single snapshot // initSingleSnapshotFilter is used for commands that work on a single snapshot
// MUST be combined with restic.FindFilteredSnapshot // MUST be combined with restic.FindFilteredSnapshot
func initSingleSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter) { func initSingleSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter) {
flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times)") flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times) (default: $RESTIC_HOST)")
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]`, when snapshot ID \"latest\" is given (can be specified multiple times)") flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]`, when snapshot ID \"latest\" is given (can be specified multiple times)")
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path`, when snapshot ID \"latest\" is given (can be specified multiple times)") flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path`, when snapshot ID \"latest\" is given (can be specified multiple times)")
// set default based on env if set
if host := os.Getenv("RESTIC_HOST"); host != "" {
filt.Hosts = []string{host}
}
} }
// FindFilteredSnapshots yields Snapshots, either given explicitly by `snapshotIDs` or filtered from the list of all snapshots. // FindFilteredSnapshots yields Snapshots, either given explicitly by `snapshotIDs` or filtered from the list of all snapshots.

61
cmd/restic/find_test.go Normal file
View File

@ -0,0 +1,61 @@
package main
import (
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/spf13/pflag"
)
func TestSnapshotFilter(t *testing.T) {
for _, test := range []struct {
name string
args []string
expected []string
env string
}{
{
"no value",
[]string{},
nil,
"",
},
{
"args only",
[]string{"--host", "abc"},
[]string{"abc"},
"",
},
{
"env default",
[]string{},
[]string{"def"},
"def",
},
{
"both",
[]string{"--host", "abc"},
[]string{"abc"},
"def",
},
} {
t.Run(test.name, func(t *testing.T) {
t.Setenv("RESTIC_HOST", test.env)
for _, mode := range []bool{false, true} {
set := pflag.NewFlagSet("test", pflag.PanicOnError)
flt := &restic.SnapshotFilter{}
if mode {
initMultiSnapshotFilter(set, flt, false)
} else {
initSingleSnapshotFilter(set, flt)
}
err := set.Parse(test.args)
rtest.OK(t, err)
rtest.Equals(t, test.expected, flt.Hosts, "unexpected hosts")
}
})
}
}

View File

@ -567,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New()) hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd) bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for { for {
val, err := it.Next() val, err := it.Next()
if err == repository.ErrPackEOF { if err == repository.ErrPackEOF {
@ -653,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil return nil
} }
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity. // ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan) c.ReadPacks(ctx, c.packs, nil, errChan)
} }
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity. // ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan) defer close(errChan)
@ -675,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers // run workers
for i := 0; i < workerCount; i++ { for i := 0; i < workerCount; i++ {
g.Go(func() error { g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
dec, err := zstd.NewReader(nil) dec, err := zstd.NewReader(nil)
if err != nil { if err != nil {
panic(dec) panic(dec)

View File

@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
for t := range downloadQueue { for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil { if err != nil {
var ierr error // a required blob couldn't be retrieved
// check whether we can get a valid copy somewhere else return err
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
} }
keepMutex.Lock() keepMutex.Lock()

View File

@ -1,7 +1,6 @@
package repository package repository
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
@ -12,7 +11,6 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
"github.com/restic/chunker" "github.com/restic/chunker"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
@ -29,8 +27,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const MaxStreamBufferSize = 4 * 1024 * 1024
const MinPackSize = 4 * 1024 * 1024 const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024 const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024 const MaxPackSize = 128 * 1024 * 1024
@ -966,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
} }
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
// Skip sections with more than 4MB unused blobs // Skip sections with more than 1MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024 const maxUnusedRange = 1 * 1024 * 1024
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error, // handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it. // then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn) return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
} }
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 { if len(blobs) == 0 {
// nothing to do // nothing to do
return nil return nil
@ -990,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lowerIdx := 0 lowerIdx := 0
lastPos := blobs[0].Offset lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize
for i := 0; i < len(blobs); i++ { for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos { if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail // don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID) return errors.Errorf("overlapping blobs in pack %v", packID)
} }
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange { if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}
if split {
// load everything up to the skipped file section // load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn) err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil { if err != nil {
return err return err
} }
@ -1006,10 +1019,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length lastPos = blobs[i].Offset + blobs[i].Length
} }
// load remainder // load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn) return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
} }
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
dataStart := blobs[0].Offset dataStart := blobs[0].Offset
@ -1017,57 +1030,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
dec, err := zstd.NewReader(nil) data := make([]byte, int(dataEnd-dataStart))
if err != nil { err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
panic(dec) _, cerr := io.ReadFull(rd, data)
} return cerr
defer dec.Close()
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return nil
}) })
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
// the context is only still valid if handleBlobFn never returned an error
if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}
return errors.Wrap(err, "StreamPack")
}
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return errors.Wrap(err, "StreamPack") return errors.Wrap(err, "StreamPack")
} }
// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}
type byteReader struct {
buf []byte
}
func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}
func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}
type PackBlobIterator struct { type PackBlobIterator struct {
packID restic.ID packID restic.ID
rd *bufio.Reader rd discardReader
currentOffset uint currentOffset uint
blobs []restic.Blob blobs []restic.Blob
key *crypto.Key key *crypto.Key
dec *zstd.Decoder dec *zstd.Decoder
buf []byte
decode []byte decode []byte
} }
@ -1079,7 +1143,7 @@ type PackBlobValue struct {
var ErrPackEOF = errors.New("reached EOF of pack file") var ErrPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint, func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator { blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{ return &PackBlobIterator{
packID: packID, packID: packID,
@ -1114,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(b.buf)) < entry.Length { buf, err := b.rd.ReadFull(int(entry.Length))
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]
n, err := io.ReadFull(b.rd, b.buf)
if err != nil { if err != nil {
debug.Log(" read error %v", err) debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err) return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
} }
if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() { if int(entry.Length) <= b.key.NonceSize() {
@ -1137,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
} }
// decryption errors are likely permanent, give the caller a chance to skip them // decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():] nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil) plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil { if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err) err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)

View File

@ -146,14 +146,14 @@ func TestStreamPack(t *testing.T) {
} }
func testStreamPack(t *testing.T, version uint) { func testStreamPack(t *testing.T, version uint) {
// always use the same key for deterministic output dec, err := zstd.NewReader(nil)
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil { if err != nil {
t.Fatal(err) panic(dec)
} }
defer dec.Close()
// always use the same key for deterministic output
key := testKey(t)
blobSizes := []int{ blobSizes := []int{
5522811, 5522811,
@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
loadCalls = 0 loadCalls = 0
shortFirstLoad = test.shortFirstLoad shortFirstLoad = test.shortFirstLoad
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
return err return err
} }
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil { if err == nil {
t.Fatalf("wanted error %v, got nil", test.err) t.Fatalf("wanted error %v, got nil", test.err)
} }
@ -449,3 +449,83 @@ func TestUnpackedVerification(t *testing.T) {
} }
} }
} }
func testKey(t *testing.T) crypto.Key {
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
return key
}
func TestStreamPackFallback(t *testing.T) {
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
test := func(t *testing.T, failLoad bool) {
key := testKey(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
plaintext := rtest.Random(800, 42)
blobID := restic.Hash(plaintext)
blobs := []restic.Blob{
{
Length: uint(crypto.CiphertextLength(len(plaintext))),
Offset: 0,
BlobHandle: restic.BlobHandle{
ID: blobID,
Type: restic.DataBlob,
},
},
}
var loadPack backendLoadFn
if failLoad {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return errors.New("load error")
}
} else {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// just return an empty array to provoke an error
data := make([]byte, length)
return fn(bytes.NewReader(data))
}
}
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == blobID {
return plaintext, nil
}
return nil, errors.New("unknown blob")
}
blobOK := false
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
rtest.OK(t, err)
rtest.Equals(t, blobID, blob.ID)
rtest.Equals(t, plaintext, buf)
blobOK = true
return err
}
err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob)
rtest.OK(t, err)
rtest.Assert(t, blobOK, "blob failed to load")
}
t.Run("corrupted blob", func(t *testing.T) {
test(t, false)
})
// test fallback for failed pack loading
t.Run("failed load", func(t *testing.T) {
test(t, true)
})
}