mirror of https://github.com/restic/restic.git
Compare commits
8 Commits
c894065beb
...
2ab863a0f4
Author | SHA1 | Date |
---|---|---|
Michael Eischer | 2ab863a0f4 | |
Michael Eischer | faffd15d13 | |
Michael Eischer | 347e9d0765 | |
Altan Orhon | 871ea1eaf3 | |
Michael Eischer | 20d8eed400 | |
Michael Eischer | cf700d8794 | |
Michael Eischer | 666a0b0bdb | |
Michael Eischer | 621012dac0 |
|
@ -0,0 +1,10 @@
|
|||
Enhancement: Allow specifying `--host` via environment variable
|
||||
|
||||
Restic commands that operate on snapshots, such as `restic backup` and
|
||||
`restic snapshots`, support the `--host` flag to specify the hostname for
|
||||
grouoping snapshots. They now permit selecting the hostname via the
|
||||
environment variable `RESTIC_HOST`. `--host` still takes precedence over the
|
||||
environment variable.
|
||||
|
||||
https://github.com/restic/restic/issues/4733
|
||||
https://github.com/restic/restic/pull/4734
|
|
@ -114,7 +114,7 @@ func init() {
|
|||
f.BoolVar(&backupOptions.StdinCommand, "stdin-from-command", false, "interpret arguments as command to execute and store its stdout")
|
||||
f.Var(&backupOptions.Tags, "tag", "add `tags` for the new snapshot in the format `tag[,tag,...]` (can be specified multiple times)")
|
||||
f.UintVar(&backupOptions.ReadConcurrency, "read-concurrency", 0, "read `n` files concurrently (default: $RESTIC_READ_CONCURRENCY or 2)")
|
||||
f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually. To prevent an expensive rescan use the \"parent\" flag")
|
||||
f.StringVarP(&backupOptions.Host, "host", "H", "", "set the `hostname` for the snapshot manually (default: $RESTIC_HOST). To prevent an expensive rescan use the \"parent\" flag")
|
||||
f.StringVar(&backupOptions.Host, "hostname", "", "set the `hostname` for the snapshot manually")
|
||||
err := f.MarkDeprecated("hostname", "use --host")
|
||||
if err != nil {
|
||||
|
@ -137,6 +137,11 @@ func init() {
|
|||
// parse read concurrency from env, on error the default value will be used
|
||||
readConcurrency, _ := strconv.ParseUint(os.Getenv("RESTIC_READ_CONCURRENCY"), 10, 32)
|
||||
backupOptions.ReadConcurrency = uint(readConcurrency)
|
||||
|
||||
// parse host from env, if not exists or empty the default value will be used
|
||||
if host := os.Getenv("RESTIC_HOST"); host != "" {
|
||||
backupOptions.Host = host
|
||||
}
|
||||
}
|
||||
|
||||
// filterExisting returns a slice of all existing items, or an error if no
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/spf13/pflag"
|
||||
|
@ -14,17 +15,27 @@ func initMultiSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter,
|
|||
if !addHostShorthand {
|
||||
hostShorthand = ""
|
||||
}
|
||||
flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times)")
|
||||
flags.StringArrayVarP(&filt.Hosts, "host", hostShorthand, nil, "only consider snapshots for this `host` (can be specified multiple times) (default: $RESTIC_HOST)")
|
||||
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]` (can be specified multiple times)")
|
||||
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path` (can be specified multiple times)")
|
||||
|
||||
// set default based on env if set
|
||||
if host := os.Getenv("RESTIC_HOST"); host != "" {
|
||||
filt.Hosts = []string{host}
|
||||
}
|
||||
}
|
||||
|
||||
// initSingleSnapshotFilter is used for commands that work on a single snapshot
|
||||
// MUST be combined with restic.FindFilteredSnapshot
|
||||
func initSingleSnapshotFilter(flags *pflag.FlagSet, filt *restic.SnapshotFilter) {
|
||||
flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times)")
|
||||
flags.StringArrayVarP(&filt.Hosts, "host", "H", nil, "only consider snapshots for this `host`, when snapshot ID \"latest\" is given (can be specified multiple times) (default: $RESTIC_HOST)")
|
||||
flags.Var(&filt.Tags, "tag", "only consider snapshots including `tag[,tag,...]`, when snapshot ID \"latest\" is given (can be specified multiple times)")
|
||||
flags.StringArrayVar(&filt.Paths, "path", nil, "only consider snapshots including this (absolute) `path`, when snapshot ID \"latest\" is given (can be specified multiple times)")
|
||||
|
||||
// set default based on env if set
|
||||
if host := os.Getenv("RESTIC_HOST"); host != "" {
|
||||
filt.Hosts = []string{host}
|
||||
}
|
||||
}
|
||||
|
||||
// FindFilteredSnapshots yields Snapshots, either given explicitly by `snapshotIDs` or filtered from the list of all snapshots.
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func TestSnapshotFilter(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
args []string
|
||||
expected []string
|
||||
env string
|
||||
}{
|
||||
{
|
||||
"no value",
|
||||
[]string{},
|
||||
nil,
|
||||
"",
|
||||
},
|
||||
{
|
||||
"args only",
|
||||
[]string{"--host", "abc"},
|
||||
[]string{"abc"},
|
||||
"",
|
||||
},
|
||||
{
|
||||
"env default",
|
||||
[]string{},
|
||||
[]string{"def"},
|
||||
"def",
|
||||
},
|
||||
{
|
||||
"both",
|
||||
[]string{"--host", "abc"},
|
||||
[]string{"abc"},
|
||||
"def",
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
t.Setenv("RESTIC_HOST", test.env)
|
||||
|
||||
for _, mode := range []bool{false, true} {
|
||||
set := pflag.NewFlagSet("test", pflag.PanicOnError)
|
||||
flt := &restic.SnapshotFilter{}
|
||||
if mode {
|
||||
initMultiSnapshotFilter(set, flt, false)
|
||||
} else {
|
||||
initSingleSnapshotFilter(set, flt)
|
||||
}
|
||||
err := set.Parse(test.args)
|
||||
rtest.OK(t, err)
|
||||
|
||||
rtest.Equals(t, test.expected, flt.Hosts, "unexpected hosts")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -567,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
|
|||
hrd := hashing.NewReader(rd, sha256.New())
|
||||
bufRd.Reset(hrd)
|
||||
|
||||
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
|
||||
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
|
||||
for {
|
||||
val, err := it.Next()
|
||||
if err == repository.ErrPackEOF {
|
||||
|
@ -653,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
|
|||
return nil
|
||||
}
|
||||
|
||||
type bufReader struct {
|
||||
rd *bufio.Reader
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newBufReader(rd *bufio.Reader) *bufReader {
|
||||
return &bufReader{
|
||||
rd: rd,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *bufReader) Discard(n int) (discarded int, err error) {
|
||||
return b.rd.Discard(n)
|
||||
}
|
||||
|
||||
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
|
||||
if cap(b.buf) < n {
|
||||
b.buf = make([]byte, n)
|
||||
}
|
||||
b.buf = b.buf[:n]
|
||||
|
||||
_, err = io.ReadFull(b.rd, b.buf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.buf, nil
|
||||
}
|
||||
|
||||
// ReadData loads all data from the repository and checks the integrity.
|
||||
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
|
||||
c.ReadPacks(ctx, c.packs, nil, errChan)
|
||||
}
|
||||
|
||||
const maxStreamBufferSize = 4 * 1024 * 1024
|
||||
|
||||
// ReadPacks loads data from specified packs and checks the integrity.
|
||||
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
|
||||
defer close(errChan)
|
||||
|
@ -675,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
|
|||
// run workers
|
||||
for i := 0; i < workerCount; i++ {
|
||||
g.Go(func() error {
|
||||
// create a buffer that is large enough to be reused by repository.StreamPack
|
||||
// this ensures that we can read the pack header later on
|
||||
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
|
||||
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
|
||||
dec, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
panic(dec)
|
||||
|
|
|
@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
|
|||
for t := range downloadQueue {
|
||||
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
var ierr error
|
||||
// check whether we can get a valid copy somewhere else
|
||||
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
||||
if ierr != nil {
|
||||
// no luck, return the original error
|
||||
return err
|
||||
}
|
||||
// a required blob couldn't be retrieved
|
||||
return err
|
||||
}
|
||||
|
||||
keepMutex.Lock()
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
|
@ -12,7 +11,6 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
|
@ -29,8 +27,6 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const MaxStreamBufferSize = 4 * 1024 * 1024
|
||||
|
||||
const MinPackSize = 4 * 1024 * 1024
|
||||
const DefaultPackSize = 16 * 1024 * 1024
|
||||
const MaxPackSize = 128 * 1024 * 1024
|
||||
|
@ -966,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
|
|||
}
|
||||
|
||||
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
||||
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
|
||||
|
||||
// Skip sections with more than 4MB unused blobs
|
||||
const maxUnusedRange = 4 * 1024 * 1024
|
||||
// Skip sections with more than 1MB unused blobs
|
||||
const maxUnusedRange = 1 * 1024 * 1024
|
||||
|
||||
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
|
||||
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
|
||||
// handleBlobFn is called at most once for each blob. If the callback returns an error,
|
||||
// then LoadBlobsFromPack will abort and not retry it.
|
||||
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
|
||||
// this specific call. The callback must not keep a reference to buf.
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
|
||||
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
if len(blobs) == 0 {
|
||||
// nothing to do
|
||||
return nil
|
||||
|
@ -990,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
|
|||
|
||||
lowerIdx := 0
|
||||
lastPos := blobs[0].Offset
|
||||
const maxChunkSize = 2 * DefaultPackSize
|
||||
|
||||
for i := 0; i < len(blobs); i++ {
|
||||
if blobs[i].Offset < lastPos {
|
||||
// don't wait for streamPackPart to fail
|
||||
return errors.Errorf("overlapping blobs in pack %v", packID)
|
||||
}
|
||||
|
||||
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
|
||||
split := false
|
||||
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
|
||||
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
|
||||
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
|
||||
split = true
|
||||
}
|
||||
// skip too large gaps as a new request is typically much cheaper than data transfers
|
||||
if blobs[i].Offset-lastPos > maxUnusedRange {
|
||||
split = true
|
||||
}
|
||||
|
||||
if split {
|
||||
// load everything up to the skipped file section
|
||||
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
|
||||
err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1006,10 +1019,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
|
|||
lastPos = blobs[i].Offset + blobs[i].Length
|
||||
}
|
||||
// load remainder
|
||||
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
|
||||
|
||||
dataStart := blobs[0].Offset
|
||||
|
@ -1017,57 +1030,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
|
|||
|
||||
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
|
||||
|
||||
dec, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
panic(dec)
|
||||
}
|
||||
defer dec.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
// stream blobs in pack
|
||||
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
|
||||
// prevent callbacks after cancellation
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
bufferSize := int(dataEnd - dataStart)
|
||||
if bufferSize > MaxStreamBufferSize {
|
||||
bufferSize = MaxStreamBufferSize
|
||||
}
|
||||
bufRd := bufio.NewReaderSize(rd, bufferSize)
|
||||
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)
|
||||
|
||||
for {
|
||||
val, err := it.Next()
|
||||
if err == ErrPackEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
// ensure that each blob is only passed once to handleBlobFn
|
||||
blobs = blobs[1:]
|
||||
}
|
||||
return nil
|
||||
data := make([]byte, int(dataEnd-dataStart))
|
||||
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
|
||||
_, cerr := io.ReadFull(rd, data)
|
||||
return cerr
|
||||
})
|
||||
// prevent callbacks after cancellation
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if err != nil {
|
||||
// the context is only still valid if handleBlobFn never returned an error
|
||||
if loadBlobFn != nil {
|
||||
// check whether we can get the remaining blobs somewhere else
|
||||
for _, entry := range blobs {
|
||||
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
|
||||
err = handleBlobFn(entry.BlobHandle, buf, ierr)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.Wrap(err, "StreamPack")
|
||||
}
|
||||
|
||||
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
|
||||
|
||||
for {
|
||||
val, err := it.Next()
|
||||
if err == ErrPackEOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if val.Err != nil && loadBlobFn != nil {
|
||||
var ierr error
|
||||
// check whether we can get a valid copy somewhere else
|
||||
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
|
||||
if ierr == nil {
|
||||
// success
|
||||
val.Plaintext = buf
|
||||
val.Err = nil
|
||||
}
|
||||
}
|
||||
|
||||
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// ensure that each blob is only passed once to handleBlobFn
|
||||
blobs = blobs[1:]
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "StreamPack")
|
||||
}
|
||||
|
||||
// discardReader allows the PackBlobIterator to perform zero copy
|
||||
// reads if the underlying data source is a byte slice.
|
||||
type discardReader interface {
|
||||
Discard(n int) (discarded int, err error)
|
||||
// ReadFull reads the next n bytes into a byte slice. The caller must not
|
||||
// retain a reference to the byte. Modifications are only allowed within
|
||||
// the boundaries of the returned slice.
|
||||
ReadFull(n int) (buf []byte, err error)
|
||||
}
|
||||
|
||||
type byteReader struct {
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newByteReader(buf []byte) *byteReader {
|
||||
return &byteReader{
|
||||
buf: buf,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *byteReader) Discard(n int) (discarded int, err error) {
|
||||
if len(b.buf) < n {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b.buf = b.buf[n:]
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
|
||||
if len(b.buf) < n {
|
||||
return nil, io.ErrUnexpectedEOF
|
||||
}
|
||||
buf = b.buf[:n]
|
||||
b.buf = b.buf[n:]
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
type PackBlobIterator struct {
|
||||
packID restic.ID
|
||||
rd *bufio.Reader
|
||||
rd discardReader
|
||||
currentOffset uint
|
||||
|
||||
blobs []restic.Blob
|
||||
key *crypto.Key
|
||||
dec *zstd.Decoder
|
||||
|
||||
buf []byte
|
||||
decode []byte
|
||||
}
|
||||
|
||||
|
@ -1079,7 +1143,7 @@ type PackBlobValue struct {
|
|||
|
||||
var ErrPackEOF = errors.New("reached EOF of pack file")
|
||||
|
||||
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint,
|
||||
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
|
||||
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
|
||||
return &PackBlobIterator{
|
||||
packID: packID,
|
||||
|
@ -1114,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
|
|||
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
|
||||
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
|
||||
|
||||
if uint(cap(b.buf)) < entry.Length {
|
||||
b.buf = make([]byte, entry.Length)
|
||||
}
|
||||
b.buf = b.buf[:entry.Length]
|
||||
|
||||
n, err := io.ReadFull(b.rd, b.buf)
|
||||
buf, err := b.rd.ReadFull(int(entry.Length))
|
||||
if err != nil {
|
||||
debug.Log(" read error %v", err)
|
||||
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
|
||||
}
|
||||
|
||||
if n != len(b.buf) {
|
||||
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
|
||||
h, b.packID.Str(), len(b.buf), n)
|
||||
}
|
||||
b.currentOffset = entry.Offset + entry.Length
|
||||
|
||||
if int(entry.Length) <= b.key.NonceSize() {
|
||||
|
@ -1137,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
|
|||
}
|
||||
|
||||
// decryption errors are likely permanent, give the caller a chance to skip them
|
||||
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():]
|
||||
nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
|
||||
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)
|
||||
|
|
|
@ -146,14 +146,14 @@ func TestStreamPack(t *testing.T) {
|
|||
}
|
||||
|
||||
func testStreamPack(t *testing.T, version uint) {
|
||||
// always use the same key for deterministic output
|
||||
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
|
||||
|
||||
var key crypto.Key
|
||||
err := json.Unmarshal([]byte(jsonKey), &key)
|
||||
dec, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
panic(dec)
|
||||
}
|
||||
defer dec.Close()
|
||||
|
||||
// always use the same key for deterministic output
|
||||
key := testKey(t)
|
||||
|
||||
blobSizes := []int{
|
||||
5522811,
|
||||
|
@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
|
|||
|
||||
loadCalls = 0
|
||||
shortFirstLoad = test.shortFirstLoad
|
||||
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
|
|||
return err
|
||||
}
|
||||
|
||||
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
if err == nil {
|
||||
t.Fatalf("wanted error %v, got nil", test.err)
|
||||
}
|
||||
|
@ -449,3 +449,83 @@ func TestUnpackedVerification(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testKey(t *testing.T) crypto.Key {
|
||||
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
|
||||
|
||||
var key crypto.Key
|
||||
err := json.Unmarshal([]byte(jsonKey), &key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func TestStreamPackFallback(t *testing.T) {
|
||||
dec, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
panic(dec)
|
||||
}
|
||||
defer dec.Close()
|
||||
|
||||
test := func(t *testing.T, failLoad bool) {
|
||||
key := testKey(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
plaintext := rtest.Random(800, 42)
|
||||
blobID := restic.Hash(plaintext)
|
||||
blobs := []restic.Blob{
|
||||
{
|
||||
Length: uint(crypto.CiphertextLength(len(plaintext))),
|
||||
Offset: 0,
|
||||
BlobHandle: restic.BlobHandle{
|
||||
ID: blobID,
|
||||
Type: restic.DataBlob,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var loadPack backendLoadFn
|
||||
if failLoad {
|
||||
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return errors.New("load error")
|
||||
}
|
||||
} else {
|
||||
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
// just return an empty array to provoke an error
|
||||
data := make([]byte, length)
|
||||
return fn(bytes.NewReader(data))
|
||||
}
|
||||
}
|
||||
|
||||
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
|
||||
if id == blobID {
|
||||
return plaintext, nil
|
||||
}
|
||||
return nil, errors.New("unknown blob")
|
||||
}
|
||||
|
||||
blobOK := false
|
||||
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, blobID, blob.ID)
|
||||
rtest.Equals(t, plaintext, buf)
|
||||
blobOK = true
|
||||
return err
|
||||
}
|
||||
|
||||
err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, blobOK, "blob failed to load")
|
||||
}
|
||||
|
||||
t.Run("corrupted blob", func(t *testing.T) {
|
||||
test(t, false)
|
||||
})
|
||||
|
||||
// test fallback for failed pack loading
|
||||
t.Run("failed load", func(t *testing.T) {
|
||||
test(t, true)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue