Compare commits

...

19 Commits

Author SHA1 Message Date
Michael Eischer c894065beb
Merge 20d8eed400 into a7b5e09902 2024-04-24 21:59:12 +02:00
Michael Eischer a7b5e09902
Merge pull request #4753 from MichaelEischer/remove-cleanup-handlers
Replace cleanup handlers with context based command cancelation
2024-04-24 21:34:19 +02:00
Michael Eischer 3f9d50865d
Merge pull request #4776 from MichaelEischer/cleanup-backend-open
unify backend open and create
2024-04-24 21:24:27 +02:00
Michael Eischer 5f263752d7 init: also apply limiter for non-HTTP backend 2024-04-24 20:42:30 +02:00
Michael Eischer 484dbb1cf4 get rid of a few global variables 2024-04-22 22:39:33 +02:00
Michael Eischer 940a3159b5 let index.Each() and pack.Size() return error on canceled context
This forces a caller to actually check that the function did complete.
2024-04-22 22:39:32 +02:00
Michael Eischer 31624aeffd Improve command shutdown on context cancellation 2024-04-22 22:31:38 +02:00
Michael Eischer 910927670f mount: fix exit code on cancellation 2024-04-22 22:27:19 +02:00
Michael Eischer 6f2a4dea21 remove global shutdown hook 2024-04-22 22:27:19 +02:00
Michael Eischer 699ef5e9de debug: replace cleanup handler usage in profiling setup 2024-04-22 22:27:19 +02:00
Michael Eischer eb710a28e8 use standalone shutdown hook for readPasswordTerminal
move terminal restoration into readPasswordTerminal
2024-04-22 22:27:19 +02:00
Michael Eischer 86c7909f41 mount: use standalone shutdown hook via goroutine 2024-04-22 22:27:19 +02:00
Michael Eischer 93135dc705 lock: drop cleanup handler 2024-04-22 22:27:19 +02:00
Michael Eischer 21a7cb405c check: replace cleanup handler 2024-04-22 22:27:19 +02:00
Michael Eischer 20d8eed400 repository: streamPack: separate requests for gap larger than 1MB
With most cloud providers, traffic is much more expensive than API
calls. Thus slightly bias streamPack towards a bit more API calls in
exchange for slightly less traffic.
2024-04-22 21:21:23 +02:00
Michael Eischer cf700d8794 repository: streamPack: reuse zstd decoder 2024-04-22 21:21:23 +02:00
Michael Eischer 666a0b0bdb repository: streamPack: replace streaming with chunked download
Due to the interface of streamPack, we cannot guarantee that operations
progress fast enough that the underlying connections remains open. This
introduces partial failures which massively complicate the error
handling.

Switch to a simpler approach that retrieves the pack in chunks of 32MB.
If a blob is larger than this limit, then it is downloaded separately.

To avoid multiple copies in memory, an auxiliary interface
`discardReader` is introduced that allows directly accessing the
downloaded byte slices, while still supporting the streaming used by the
`check` command.
2024-04-22 21:21:23 +02:00
Michael Eischer 621012dac0 repository: Add blob loading fallback to LoadBlobsFromPack
Try to retrieve individual blobs via LoadBlob if streaming did not work.
2024-04-21 21:35:55 +02:00
Michael Eischer 6c6dceade3 global: unify backend open and create 2024-04-19 22:26:14 +02:00
45 changed files with 612 additions and 415 deletions

View File

@ -1,89 +1,41 @@
package main
import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"github.com/restic/restic/internal/debug"
)
var cleanupHandlers struct {
sync.Mutex
list []func(code int) (int, error)
done bool
ch chan os.Signal
func createGlobalContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan os.Signal, 1)
go cleanupHandler(ch, cancel)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ctx
}
func init() {
cleanupHandlers.ch = make(chan os.Signal, 1)
go CleanupHandler(cleanupHandlers.ch)
signal.Notify(cleanupHandlers.ch, syscall.SIGINT, syscall.SIGTERM)
}
// cleanupHandler handles the SIGINT and SIGTERM signals.
func cleanupHandler(c <-chan os.Signal, cancel context.CancelFunc) {
s := <-c
debug.Log("signal %v received, cleaning up", s)
Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)
// AddCleanupHandler adds the function f to the list of cleanup handlers so
// that it is executed when all the cleanup handlers are run, e.g. when SIGINT
// is received.
func AddCleanupHandler(f func(code int) (int, error)) {
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()
// reset the done flag for integration tests
cleanupHandlers.done = false
cleanupHandlers.list = append(cleanupHandlers.list, f)
}
// RunCleanupHandlers runs all registered cleanup handlers
func RunCleanupHandlers(code int) int {
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()
if cleanupHandlers.done {
return code
if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
_, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
_, _ = os.Stderr.WriteString(debug.DumpStacktrace())
_, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
}
cleanupHandlers.done = true
for _, f := range cleanupHandlers.list {
var err error
code, err = f(code)
if err != nil {
Warnf("error in cleanup handler: %v\n", err)
}
}
cleanupHandlers.list = nil
return code
cancel()
}
// CleanupHandler handles the SIGINT and SIGTERM signals.
func CleanupHandler(c <-chan os.Signal) {
for s := range c {
debug.Log("signal %v received, cleaning up", s)
Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)
if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
_, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
_, _ = os.Stderr.WriteString(debug.DumpStacktrace())
_, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
}
code := 0
if s == syscall.SIGINT || s == syscall.SIGTERM {
code = 130
} else {
code = 1
}
Exit(code)
}
}
// Exit runs the cleanup handlers and then terminates the process with the
// given exit code.
// Exit terminates the process with the given exit code.
func Exit(code int) {
code = RunCleanupHandlers(code)
debug.Log("exiting with status code %d", code)
os.Exit(code)
}

View File

@ -199,10 +199,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
}
cleanup := prepareCheckCache(opts, &gopts)
AddCleanupHandler(func(code int) (int, error) {
cleanup()
return code, nil
})
defer cleanup()
if !gopts.NoLock {
Verbosef("create exclusive lock for repository\n")
@ -222,6 +219,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
Verbosef("load indexes\n")
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
hints, errs := chkr.LoadIndex(ctx, bar)
if ctx.Err() != nil {
return ctx.Err()
}
errorsFound := false
suggestIndexRebuild := false
@ -283,6 +283,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
if orphanedPacks > 0 {
Verbosef("%d additional files were found in the repo, which likely contain duplicate data.\nThis is non-critical, you can run `restic prune` to correct this.\n", orphanedPacks)
}
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("check snapshots, trees and blobs\n")
errChan = make(chan error)
@ -316,9 +319,16 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
// Must happen after `errChan` is read from in the above loop to avoid
// deadlocking in the case of errors.
wg.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
if opts.CheckUnused {
for _, id := range chkr.UnusedBlobs(ctx) {
unused, err := chkr.UnusedBlobs(ctx)
if err != nil {
return err
}
for _, id := range unused {
Verbosef("unused blob %v\n", id)
errorsFound = true
}
@ -395,10 +405,13 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
doReadData(packs)
}
if ctx.Err() != nil {
return ctx.Err()
}
if errorsFound {
return errors.Fatal("repository contains errors")
}
Verbosef("no errors were found\n")
return nil

View File

@ -53,7 +53,7 @@ func init() {
}
func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []string) error {
secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "destination")
secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "destination")
if err != nil {
return err
}
@ -103,6 +103,9 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
// also consider identical snapshot copies
dstSnapshotByOriginal[*sn.ID()] = append(dstSnapshotByOriginal[*sn.ID()], sn)
}
if ctx.Err() != nil {
return ctx.Err()
}
// remember already processed trees across all snapshots
visitedTrees := restic.NewIDSet()
@ -147,7 +150,7 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
}
Verbosef("snapshot %s saved\n", newID.Str())
}
return nil
return ctx.Err()
}
func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {

View File

@ -439,7 +439,10 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
if err != errAllPacksFound {
// try to resolve unknown pack ids from the index
packIDs = f.indexPacksToBlobs(ctx, packIDs)
packIDs, err = f.indexPacksToBlobs(ctx, packIDs)
if err != nil {
return err
}
}
if len(packIDs) > 0 {
@ -456,13 +459,13 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
return nil
}
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) map[string]struct{} {
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) (map[string]struct{}, error) {
wctx, cancel := context.WithCancel(ctx)
defer cancel()
// remember which packs were found in the index
indexPackIDs := make(map[string]struct{})
f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries
matchingID := false
@ -481,6 +484,9 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
indexPackIDs[idStr] = struct{}{}
}
})
if err != nil {
return nil, err
}
for id := range indexPackIDs {
delete(packIDs, id)
@ -493,7 +499,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
}
Warnf("some pack files are missing from the repository, getting their blobs from the repository index: %v\n\n", list)
}
return packIDs
return packIDs, nil
}
func (f *Finder) findObjectPack(id string, t restic.BlobType) {
@ -608,6 +614,9 @@ func runFind(ctx context.Context, opts FindOptions, gopts GlobalOptions, args []
for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, opts.Snapshots) {
filteredSnapshots = append(filteredSnapshots, sn)
}
if ctx.Err() != nil {
return ctx.Err()
}
sort.Slice(filteredSnapshots, func(i, j int) bool {
return filteredSnapshots[i].Time.Before(filteredSnapshots[j].Time)

View File

@ -188,6 +188,9 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
snapshots = append(snapshots, sn)
}
if ctx.Err() != nil {
return ctx.Err()
}
var jsonGroups []*ForgetGroup
@ -270,6 +273,10 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
}
}
if ctx.Err() != nil {
return ctx.Err()
}
if len(removeSnIDs) > 0 {
if !opts.DryRun {
bar := printer.NewCounter("files deleted")

View File

@ -80,7 +80,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []
return err
}
gopts.password, err = ReadPasswordTwice(gopts,
gopts.password, err = ReadPasswordTwice(ctx, gopts,
"enter password for new repository: ",
"enter password again: ")
if err != nil {
@ -131,7 +131,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []
func maybeReadChunkerPolynomial(ctx context.Context, opts InitOptions, gopts GlobalOptions) (*chunker.Pol, error) {
if opts.CopyChunkerParameters {
otherGopts, _, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "secondary")
otherGopts, _, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "secondary")
if err != nil {
return nil, err
}

View File

@ -60,7 +60,7 @@ func runKeyAdd(ctx context.Context, gopts GlobalOptions, opts KeyAddOptions, arg
}
func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyAddOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile)
pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil {
return err
}
@ -83,7 +83,7 @@ func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOption
// testKeyNewPassword is used to set a new password during integration testing.
var testKeyNewPassword string
func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error) {
func getNewPassword(ctx context.Context, gopts GlobalOptions, newPasswordFile string) (string, error) {
if testKeyNewPassword != "" {
return testKeyNewPassword, nil
}
@ -97,7 +97,7 @@ func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error)
newopts := gopts
newopts.password = ""
return ReadPasswordTwice(newopts,
return ReadPasswordTwice(ctx, newopts,
"enter new password: ",
"enter password again: ")
}

View File

@ -57,7 +57,7 @@ func runKeyPasswd(ctx context.Context, gopts GlobalOptions, opts KeyPasswdOption
}
func changePassword(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyPasswdOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile)
pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil {
return err
}

View File

@ -59,10 +59,9 @@ func runList(ctx context.Context, gopts GlobalOptions, args []string) error {
if err != nil {
return err
}
idx.Each(ctx, func(blobs restic.PackedBlob) {
return idx.Each(ctx, func(blobs restic.PackedBlob) {
Printf("%v %v\n", blobs.Type, blobs.ID)
})
return nil
})
default:
return errors.Fatal("invalid type")

View File

@ -152,28 +152,15 @@ func runMount(ctx context.Context, opts MountOptions, gopts GlobalOptions, args
}
}
AddCleanupHandler(func(code int) (int, error) {
debug.Log("running umount cleanup handler for mount at %v", mountpoint)
err := umount(mountpoint)
if err != nil {
Warnf("unable to umount (maybe already umounted or still in use?): %v\n", err)
}
// replace error code of sigint
if code == 130 {
code = 0
}
return code, nil
})
systemFuse.Debug = func(msg interface{}) {
debug.Log("fuse: %v", msg)
}
c, err := systemFuse.Mount(mountpoint, mountOptions...)
if err != nil {
return err
}
systemFuse.Debug = func(msg interface{}) {
debug.Log("fuse: %v", msg)
}
cfg := fuse.Config{
OwnerIsRoot: opts.OwnerRoot,
Filter: opts.SnapshotFilter,
@ -187,15 +174,26 @@ func runMount(ctx context.Context, opts MountOptions, gopts GlobalOptions, args
Printf("When finished, quit with Ctrl-c here or umount the mountpoint.\n")
debug.Log("serving mount at %v", mountpoint)
err = fs.Serve(c, root)
if err != nil {
return err
done := make(chan struct{})
go func() {
defer close(done)
err = fs.Serve(c, root)
}()
select {
case <-ctx.Done():
debug.Log("running umount cleanup handler for mount at %v", mountpoint)
err := systemFuse.Unmount(mountpoint)
if err != nil {
Warnf("unable to umount (maybe already umounted or still in use?): %v\n", err)
}
return ErrOK
case <-done:
// clean shutdown, nothing to do
}
<-c.Ready
return c.MountError
}
func umount(mountpoint string) error {
return systemFuse.Unmount(mountpoint)
return err
}

View File

@ -12,6 +12,7 @@ import (
"testing"
"time"
systemFuse "github.com/anacrolix/fuse"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
@ -65,7 +66,7 @@ func testRunMount(t testing.TB, gopts GlobalOptions, dir string, wg *sync.WaitGr
func testRunUmount(t testing.TB, dir string) {
var err error
for i := 0; i < mountWait; i++ {
if err = umount(dir); err == nil {
if err = systemFuse.Unmount(dir); err == nil {
t.Logf("directory %v umounted", dir)
return
}

View File

@ -197,6 +197,9 @@ func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOption
if err != nil {
return err
}
if ctx.Err() != nil {
return ctx.Err()
}
if popts.DryRun {
printer.P("\nWould have made the following changes:")

View File

@ -61,16 +61,22 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
// tree. If it is not referenced, we have a root tree.
trees := make(map[restic.ID]bool)
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if blob.Type == restic.TreeBlob {
trees[blob.Blob.ID] = false
}
})
if err != nil {
return err
}
Verbosef("load %d trees\n", len(trees))
bar = newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded")
for id := range trees {
tree, err := restic.LoadTree(ctx, repo, id)
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
Warnf("unable to load tree %v: %v\n", id.Str(), err)
continue

View File

@ -145,6 +145,9 @@ func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOpt
changedCount++
}
}
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("\n")
if changedCount == 0 {

View File

@ -294,6 +294,9 @@ func runRewrite(ctx context.Context, opts RewriteOptions, gopts GlobalOptions, a
changedCount++
}
}
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("\n")
if changedCount == 0 {

View File

@ -69,6 +69,9 @@ func runSnapshots(ctx context.Context, opts SnapshotOptions, gopts GlobalOptions
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
snapshots = append(snapshots, sn)
}
if ctx.Err() != nil {
return ctx.Err()
}
snapshotGroups, grouped, err := restic.GroupSnapshots(snapshots, opts.GroupBy)
if err != nil {
return err

View File

@ -117,9 +117,8 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args
return fmt.Errorf("error walking snapshot: %v", err)
}
}
if err != nil {
return err
if ctx.Err() != nil {
return ctx.Err()
}
if opts.countMode == countModeRawData {
@ -352,7 +351,10 @@ func statsDebug(ctx context.Context, repo restic.Repository) error {
Warnf("File Type: %v\n%v\n", t, hist)
}
hist := statsDebugBlobs(ctx, repo)
hist, err := statsDebugBlobs(ctx, repo)
if err != nil {
return err
}
for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} {
Warnf("Blob Type: %v\n%v\n\n", t, hist[t])
}
@ -370,17 +372,17 @@ func statsDebugFileType(ctx context.Context, repo restic.Lister, tpe restic.File
return hist, err
}
func statsDebugBlobs(ctx context.Context, repo restic.Repository) [restic.NumBlobTypes]*sizeHistogram {
func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBlobTypes]*sizeHistogram, error) {
var hist [restic.NumBlobTypes]*sizeHistogram
for i := 0; i < len(hist); i++ {
hist[i] = newSizeHistogram(2 * chunker.MaxSize)
}
repo.Index().Each(ctx, func(pb restic.PackedBlob) {
err := repo.Index().Each(ctx, func(pb restic.PackedBlob) {
hist[pb.Type].Add(uint64(pb.Length))
})
return hist
return hist, err
}
type sizeClass struct {

View File

@ -122,6 +122,9 @@ func runTag(ctx context.Context, opts TagOptions, gopts GlobalOptions, args []st
changeCnt++
}
}
if ctx.Err() != nil {
return ctx.Err()
}
if changeCnt == 0 {
Verbosef("no snapshots were modified\n")
} else {

View File

@ -43,7 +43,7 @@ import (
"golang.org/x/term"
)
var version = "0.16.4-dev (compiled manually)"
const version = "0.16.4-dev (compiled manually)"
// TimeFormat is the format used for all timestamps printed by restic.
const TimeFormat = "2006-01-02 15:04:05"
@ -96,9 +96,6 @@ var globalOptions = GlobalOptions{
stderr: os.Stderr,
}
var isReadingPassword bool
var internalGlobalCtx context.Context
func init() {
backends := location.NewRegistry()
backends.Register(azure.NewFactory())
@ -112,15 +109,6 @@ func init() {
backends.Register(swift.NewFactory())
globalOptions.backends = backends
var cancel context.CancelFunc
internalGlobalCtx, cancel = context.WithCancel(context.Background())
AddCleanupHandler(func(code int) (int, error) {
// Must be called before the unlock cleanup handler to ensure that the latter is
// not blocked due to limited number of backend connections, see #1434
cancel()
return code, nil
})
f := cmdRoot.PersistentFlags()
f.StringVarP(&globalOptions.Repo, "repo", "r", "", "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", "", "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
@ -165,8 +153,6 @@ func init() {
// parse target pack size from env, on error the default value will be used
targetPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_PACK_SIZE"), 10, 32)
globalOptions.PackSize = uint(targetPackSize)
restoreTerminal()
}
func stdinIsTerminal() bool {
@ -191,40 +177,6 @@ func stdoutTerminalWidth() int {
return w
}
// restoreTerminal installs a cleanup handler that restores the previous
// terminal state on exit. This handler is only intended to restore the
// terminal configuration if restic exits after receiving a signal. A regular
// program execution must revert changes to the terminal configuration itself.
// The terminal configuration is only restored while reading a password.
func restoreTerminal() {
if !term.IsTerminal(int(os.Stdout.Fd())) {
return
}
fd := int(os.Stdout.Fd())
state, err := term.GetState(fd)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to get terminal state: %v\n", err)
return
}
AddCleanupHandler(func(code int) (int, error) {
// Restoring the terminal configuration while restic runs in the
// background, causes restic to get stopped on unix systems with
// a SIGTTOU signal. Thus only restore the terminal settings if
// they might have been modified, which is the case while reading
// a password.
if !isReadingPassword {
return code, nil
}
err := term.Restore(fd, state)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to restore terminal state: %v\n", err)
}
return code, err
})
}
// ClearLine creates a platform dependent string to clear the current
// line, so it can be overwritten.
//
@ -333,24 +285,48 @@ func readPassword(in io.Reader) (password string, err error) {
// readPasswordTerminal reads the password from the given reader which must be a
// tty. Prompt is printed on the writer out before attempting to read the
// password.
func readPasswordTerminal(in *os.File, out io.Writer, prompt string) (password string, err error) {
fmt.Fprint(out, prompt)
isReadingPassword = true
buf, err := term.ReadPassword(int(in.Fd()))
isReadingPassword = false
fmt.Fprintln(out)
// password. If the context is canceled, the function leaks the password reading
// goroutine.
func readPasswordTerminal(ctx context.Context, in *os.File, out *os.File, prompt string) (password string, err error) {
fd := int(out.Fd())
state, err := term.GetState(fd)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to get terminal state: %v\n", err)
return "", err
}
done := make(chan struct{})
var buf []byte
go func() {
defer close(done)
fmt.Fprint(out, prompt)
buf, err = term.ReadPassword(int(in.Fd()))
fmt.Fprintln(out)
}()
select {
case <-ctx.Done():
err := term.Restore(fd, state)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to restore terminal state: %v\n", err)
}
return "", ctx.Err()
case <-done:
// clean shutdown, nothing to do
}
if err != nil {
return "", errors.Wrap(err, "ReadPassword")
}
password = string(buf)
return password, nil
return string(buf), nil
}
// ReadPassword reads the password from a password file, the environment
// variable RESTIC_PASSWORD or prompts the user.
func ReadPassword(opts GlobalOptions, prompt string) (string, error) {
// variable RESTIC_PASSWORD or prompts the user. If the context is canceled,
// the function leaks the password reading goroutine.
func ReadPassword(ctx context.Context, opts GlobalOptions, prompt string) (string, error) {
if opts.password != "" {
return opts.password, nil
}
@ -361,7 +337,7 @@ func ReadPassword(opts GlobalOptions, prompt string) (string, error) {
)
if stdinIsTerminal() {
password, err = readPasswordTerminal(os.Stdin, os.Stderr, prompt)
password, err = readPasswordTerminal(ctx, os.Stdin, os.Stderr, prompt)
} else {
password, err = readPassword(os.Stdin)
Verbosef("reading repository password from stdin\n")
@ -379,14 +355,15 @@ func ReadPassword(opts GlobalOptions, prompt string) (string, error) {
}
// ReadPasswordTwice calls ReadPassword two times and returns an error when the
// passwords don't match.
func ReadPasswordTwice(gopts GlobalOptions, prompt1, prompt2 string) (string, error) {
pw1, err := ReadPassword(gopts, prompt1)
// passwords don't match. If the context is canceled, the function leaks the
// password reading goroutine.
func ReadPasswordTwice(ctx context.Context, gopts GlobalOptions, prompt1, prompt2 string) (string, error) {
pw1, err := ReadPassword(ctx, gopts, prompt1)
if err != nil {
return "", err
}
if stdinIsTerminal() {
pw2, err := ReadPassword(gopts, prompt2)
pw2, err := ReadPassword(ctx, gopts, prompt2)
if err != nil {
return "", err
}
@ -469,7 +446,10 @@ func OpenRepository(ctx context.Context, opts GlobalOptions) (*repository.Reposi
}
for ; passwordTriesLeft > 0; passwordTriesLeft-- {
opts.password, err = ReadPassword(opts, "enter password for repository: ")
opts.password, err = ReadPassword(ctx, opts, "enter password for repository: ")
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err != nil && passwordTriesLeft > 1 {
opts.password = ""
fmt.Printf("%s. Try again\n", err)
@ -570,16 +550,13 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
return cfg, nil
}
// Open the backend specified by a location config.
func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
func innerOpen(ctx context.Context, s string, gopts GlobalOptions, opts options.Options, create bool) (backend.Backend, error) {
debug.Log("parsing location %v", location.StripPassword(gopts.backends, s))
loc, err := location.Parse(gopts.backends, s)
if err != nil {
return nil, errors.Fatalf("parsing repository location failed: %v", err)
}
var be backend.Backend
cfg, err := parseConfig(loc, opts)
if err != nil {
return nil, err
@ -599,7 +576,13 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
}
be, err = factory.Open(ctx, cfg, rt, lim)
var be backend.Backend
if create {
be, err = factory.Create(ctx, cfg, rt, lim)
} else {
be, err = factory.Open(ctx, cfg, rt, lim)
}
if err != nil {
return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(gopts.backends, s), err)
}
@ -615,6 +598,17 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
}
}
return be, nil
}
// Open the backend specified by a location config.
func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
be, err := innerOpen(ctx, s, gopts, opts, false)
if err != nil {
return nil, err
}
// check if config is there
fi, err := be.Stat(ctx, backend.Handle{Type: restic.ConfigFile})
if err != nil {
@ -630,31 +624,5 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
// Create the backend specified by URI.
func create(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
debug.Log("parsing location %v", location.StripPassword(gopts.backends, s))
loc, err := location.Parse(gopts.backends, s)
if err != nil {
return nil, err
}
cfg, err := parseConfig(loc, opts)
if err != nil {
return nil, err
}
rt, err := backend.Transport(globalOptions.TransportOptions)
if err != nil {
return nil, errors.Fatal(err.Error())
}
factory := gopts.backends.Lookup(loc.Scheme)
if factory == nil {
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
}
be, err := factory.Create(ctx, cfg, rt, nil)
if err != nil {
return nil, err
}
return logger.New(sema.NewBackend(be)), nil
return innerOpen(ctx, s, gopts, opts, true)
}

View File

@ -15,23 +15,28 @@ import (
"github.com/pkg/profile"
)
var (
listenProfile string
memProfilePath string
cpuProfilePath string
traceProfilePath string
blockProfilePath string
insecure bool
)
type ProfileOptions struct {
listen string
memPath string
cpuPath string
tracePath string
blockPath string
insecure bool
}
var profileOpts ProfileOptions
var prof interface {
Stop()
}
func init() {
f := cmdRoot.PersistentFlags()
f.StringVar(&listenProfile, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`")
f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`")
f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`")
f.StringVar(&blockProfilePath, "block-profile", "", "write block profile to `dir`")
f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings")
f.StringVar(&profileOpts.listen, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&profileOpts.memPath, "mem-profile", "", "write memory profile to `dir`")
f.StringVar(&profileOpts.cpuPath, "cpu-profile", "", "write cpu profile to `dir`")
f.StringVar(&profileOpts.tracePath, "trace-profile", "", "write trace to `dir`")
f.StringVar(&profileOpts.blockPath, "block-profile", "", "write block profile to `dir`")
f.BoolVar(&profileOpts.insecure, "insecure-kdf", false, "use insecure KDF settings")
}
type fakeTestingTB struct{}
@ -41,10 +46,10 @@ func (fakeTestingTB) Logf(msg string, args ...interface{}) {
}
func runDebug() error {
if listenProfile != "" {
fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", listenProfile)
if profileOpts.listen != "" {
fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", profileOpts.listen)
go func() {
err := http.ListenAndServe(listenProfile, nil)
err := http.ListenAndServe(profileOpts.listen, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err)
}
@ -52,16 +57,16 @@ func runDebug() error {
}
profilesEnabled := 0
if memProfilePath != "" {
if profileOpts.memPath != "" {
profilesEnabled++
}
if cpuProfilePath != "" {
if profileOpts.cpuPath != "" {
profilesEnabled++
}
if traceProfilePath != "" {
if profileOpts.tracePath != "" {
profilesEnabled++
}
if blockProfilePath != "" {
if profileOpts.blockPath != "" {
profilesEnabled++
}
@ -69,30 +74,25 @@ func runDebug() error {
return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time")
}
var prof interface {
Stop()
if profileOpts.memPath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.MemProfile, profile.ProfilePath(profileOpts.memPath))
} else if profileOpts.cpuPath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(profileOpts.cpuPath))
} else if profileOpts.tracePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(profileOpts.tracePath))
} else if profileOpts.blockPath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(profileOpts.blockPath))
}
if memProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.MemProfile, profile.ProfilePath(memProfilePath))
} else if cpuProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath))
} else if traceProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath))
} else if blockProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(blockProfilePath))
}
if prof != nil {
AddCleanupHandler(func(code int) (int, error) {
prof.Stop()
return code, nil
})
}
if insecure {
if profileOpts.insecure {
repository.TestUseLowSecurityKDFParameters(fakeTestingTB{})
}
return nil
}
func stopDebug() {
if prof != nil {
prof.Stop()
}
}

View File

@ -5,3 +5,6 @@ package main
// runDebug is a noop without the debug tag.
func runDebug() error { return nil }
// stopDebug is a noop without the debug tag.
func stopDebug() {}

View File

@ -252,11 +252,11 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
}))
return treePacks
}
@ -280,11 +280,11 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
}))
// remove all packs containing data blobs
rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {

View File

@ -21,18 +21,11 @@ func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun boo
Verbosef("%s", msg)
}
}, Warnf)
unlock = lock.Unlock
// make sure that a repository is unlocked properly and after cancel() was
// called by the cleanup handler in global.go
AddCleanupHandler(func(code int) (int, error) {
lock.Unlock()
return code, nil
})
if err != nil {
return nil, nil, nil, err
}
unlock = lock.Unlock
} else {
repo.SetDryRun()
}

View File

@ -3,6 +3,7 @@ package main
import (
"bufio"
"bytes"
"context"
"fmt"
"log"
"os"
@ -24,6 +25,8 @@ func init() {
_, _ = maxprocs.Set()
}
var ErrOK = errors.New("ok")
// cmdRoot is the base command when no other command has been specified.
var cmdRoot = &cobra.Command{
Use: "restic",
@ -74,6 +77,9 @@ The full documentation can be found at https://restic.readthedocs.io/ .
// enabled)
return runDebug()
},
PersistentPostRun: func(_ *cobra.Command, _ []string) {
stopDebug()
},
}
// Distinguish commands that need the password from those that work without,
@ -88,8 +94,6 @@ func needsPassword(cmd string) bool {
}
}
var logBuffer = bytes.NewBuffer(nil)
func tweakGoGC() {
// lower GOGC from 100 to 50, unless it was manually overwritten by the user
oldValue := godebug.SetGCPercent(50)
@ -102,6 +106,7 @@ func main() {
tweakGoGC()
// install custom global logger into a buffer, if an error occurs
// we can show the logs
logBuffer := bytes.NewBuffer(nil)
log.SetOutput(logBuffer)
err := feature.Flag.Apply(os.Getenv("RESTIC_FEATURES"), func(s string) {
@ -115,7 +120,16 @@ func main() {
debug.Log("main %#v", os.Args)
debug.Log("restic %s compiled with %v on %v/%v",
version, runtime.Version(), runtime.GOOS, runtime.GOARCH)
err = cmdRoot.ExecuteContext(internalGlobalCtx)
ctx := createGlobalContext()
err = cmdRoot.ExecuteContext(ctx)
if err == nil {
err = ctx.Err()
} else if err == ErrOK {
// ErrOK overwrites context cancelation errors
err = nil
}
switch {
case restic.IsAlreadyLocked(err):
@ -137,11 +151,13 @@ func main() {
}
var exitCode int
switch err {
case nil:
switch {
case err == nil:
exitCode = 0
case ErrInvalidSourceData:
case err == ErrInvalidSourceData:
exitCode = 3
case errors.Is(err, context.Canceled):
exitCode = 130
default:
exitCode = 1
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"os"
"github.com/restic/restic/internal/errors"
@ -56,7 +57,7 @@ func initSecondaryRepoOptions(f *pflag.FlagSet, opts *secondaryRepoOptions, repo
opts.PasswordCommand = os.Getenv("RESTIC_FROM_PASSWORD_COMMAND")
}
func fillSecondaryGlobalOpts(opts secondaryRepoOptions, gopts GlobalOptions, repoPrefix string) (GlobalOptions, bool, error) {
func fillSecondaryGlobalOpts(ctx context.Context, opts secondaryRepoOptions, gopts GlobalOptions, repoPrefix string) (GlobalOptions, bool, error) {
if opts.Repo == "" && opts.RepositoryFile == "" && opts.LegacyRepo == "" && opts.LegacyRepositoryFile == "" {
return GlobalOptions{}, false, errors.Fatal("Please specify a source repository location (--from-repo or --from-repository-file)")
}
@ -109,7 +110,7 @@ func fillSecondaryGlobalOpts(opts secondaryRepoOptions, gopts GlobalOptions, rep
return GlobalOptions{}, false, err
}
}
dstGopts.password, err = ReadPassword(dstGopts, "enter password for "+repoPrefix+" repository: ")
dstGopts.password, err = ReadPassword(ctx, dstGopts, "enter password for "+repoPrefix+" repository: ")
if err != nil {
return GlobalOptions{}, false, err
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"os"
"path/filepath"
"testing"
@ -170,7 +171,7 @@ func TestFillSecondaryGlobalOpts(t *testing.T) {
// Test all valid cases
for _, testCase := range validSecondaryRepoTestCases {
DstGOpts, isFromRepo, err := fillSecondaryGlobalOpts(testCase.Opts, gOpts, "destination")
DstGOpts, isFromRepo, err := fillSecondaryGlobalOpts(context.TODO(), testCase.Opts, gOpts, "destination")
rtest.OK(t, err)
rtest.Equals(t, DstGOpts, testCase.DstGOpts)
rtest.Equals(t, isFromRepo, testCase.FromRepo)
@ -178,7 +179,7 @@ func TestFillSecondaryGlobalOpts(t *testing.T) {
// Test all invalid cases
for _, testCase := range invalidSecondaryRepoTestCases {
_, _, err := fillSecondaryGlobalOpts(testCase.Opts, gOpts, "destination")
_, _, err := fillSecondaryGlobalOpts(context.TODO(), testCase.Opts, gOpts, "destination")
rtest.Assert(t, err != nil, "Expected error, but function did not return an error")
}
}

View File

@ -303,7 +303,7 @@ func generateFiles() {
}
}
var versionPattern = `var version = ".*"`
var versionPattern = `const version = ".*"`
const versionCodeFile = "cmd/restic/global.go"
@ -313,7 +313,7 @@ func updateVersion() {
die("unable to write version to file: %v", err)
}
newVersion := fmt.Sprintf("var version = %q", opts.Version)
newVersion := fmt.Sprintf("const version = %q", opts.Version)
replace(versionCodeFile, versionPattern, newVersion)
if len(uncommittedChanges("VERSION")) > 0 || len(uncommittedChanges(versionCodeFile)) > 0 {
@ -323,7 +323,7 @@ func updateVersion() {
}
func updateVersionDev() {
newVersion := fmt.Sprintf(`var version = "%s-dev (compiled manually)"`, opts.Version)
newVersion := fmt.Sprintf(`const version = "%s-dev (compiled manually)"`, opts.Version)
replace(versionCodeFile, versionPattern, newVersion)
msg("committing cmd/restic/global.go with dev version")

View File

@ -380,6 +380,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult {
return res
}
case <-ctx.Done():
return futureNodeResult{err: ctx.Err()}
}
return futureNodeResult{err: errors.Errorf("no result")}
}

View File

@ -90,6 +90,10 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
// return the error if it wasn't ignored
if fnr.err != nil {
debug.Log("err for %v: %v", fnr.snPath, fnr.err)
if fnr.err == context.Canceled {
return nil, stats, fnr.err
}
fnr.err = s.errFn(fnr.target, fnr.err)
if fnr.err == nil {
// ignore error

View File

@ -106,9 +106,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error {
return err
}
func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID]restic.BlobType {
func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType)
idx.Each(ctx, func(pb restic.PackedBlob) {
err := idx.Each(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID]
if exists {
if pb.Type != tpe {
@ -119,7 +119,7 @@ func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID
}
packs[pb.PackID] = tpe
})
return packs
return packs, err
}
// LoadIndex loads all index files.
@ -169,7 +169,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
debug.Log("process blobs")
cnt := 0
index.Each(ctx, func(blob restic.PackedBlob) {
err = index.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
@ -179,7 +179,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
})
debug.Log("%d blobs processed", cnt)
return nil
return err
})
if err != nil {
errs = append(errs, err)
@ -193,8 +193,14 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
}
// compute pack size using index entries
c.packs = pack.Size(ctx, c.masterIndex, false)
packTypes := computePackTypes(ctx, c.masterIndex)
c.packs, err = pack.Size(ctx, c.masterIndex, false)
if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.masterIndex)
if err != nil {
return hints, append(errs, err)
}
debug.Log("checking for duplicate packs")
for packID := range c.packs {
@ -484,7 +490,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
}
// UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, err error) {
if !c.trackUnused {
panic("only works when tracking blob references")
}
@ -495,7 +501,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h)
@ -503,7 +509,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
}
})
return blobs
return blobs, err
}
// CountPacks returns the number of packs in the repository.
@ -561,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
@ -647,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil
}
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
@ -669,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)

View File

@ -180,7 +180,8 @@ func TestUnreferencedBlobs(t *testing.T) {
test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr))
blobs := chkr.UnusedBlobs(context.TODO())
blobs, err := chkr.UnusedBlobs(context.TODO())
test.OK(t, err)
sort.Sort(blobs)
test.Equals(t, unusedBlobsBySnapshot, blobs)

View File

@ -43,7 +43,10 @@ func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) {
}
// unused blobs
blobs := chkr.UnusedBlobs(context.TODO())
blobs, err := chkr.UnusedBlobs(context.TODO())
if err != nil {
t.Error(err)
}
if len(blobs) > 0 {
t.Errorf("unused blobs found: %v", blobs)
}

View File

@ -218,7 +218,7 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
idx.m.Lock()
defer idx.m.Unlock()
@ -232,6 +232,7 @@ func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
return true
})
}
return ctx.Err()
}
type EachByPackResult struct {

View File

@ -339,7 +339,7 @@ func TestIndexUnserialize(t *testing.T) {
rtest.Equals(t, oldIdx, idx.Supersedes())
blobs := listPack(idx, exampleLookupTest.packID)
blobs := listPack(t, idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
}
@ -356,12 +356,12 @@ func TestIndexUnserialize(t *testing.T) {
}
}
func listPack(idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
func listPack(t testing.TB, idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
})
}))
return pbs
}

View File

@ -223,13 +223,16 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) {
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
idx.Each(ctx, fn)
if err := idx.Each(ctx, fn); err != nil {
return err
}
}
return nil
}
// MergeFinalIndexes merges all final indexes together.
@ -320,6 +323,9 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, exclude
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
}
err := newIndex.AddToSupersedes(extraObsolete...)
@ -426,10 +432,6 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
if ctx.Err() != nil {
return
}
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
@ -439,11 +441,14 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 {
continue
}
mi.Each(ctx, func(pb restic.PackedBlob) {
err := mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
if err != nil {
return
}
// pass on packs
for packID, pbs := range packBlob {

View File

@ -166,9 +166,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, 1, idxCount)
blobCount := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
})
}))
rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1)
@ -198,9 +198,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
})
}))
rtest.Equals(t, 2, blobCount)
}
@ -319,9 +319,9 @@ func BenchmarkMasterIndexEach(b *testing.B) {
for i := 0; i < b.N; i++ {
entries := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(b, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++
})
}))
}
}

View File

@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
// If onlyHdr is set to true, only the size of the header is returned
// Note that this function only gives correct sizes, if there are no
// duplicates in the index.
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 {
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) {
packSize := make(map[restic.ID]int64)
mi.Each(ctx, func(blob restic.PackedBlob) {
err := mi.Each(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID]
if !ok {
size = headerSize
@ -403,5 +403,5 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
})
return packSize
return packSize, err
}

View File

@ -124,12 +124,15 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
blobCount := keepBlobs.Len()
// when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err := repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
return
}
keepBlobs.Delete(blob.BlobHandle)
})
if err != nil {
return nil, err
}
if keepBlobs.Len() < blobCount/2 {
// replace with copy to shrink map to necessary size if there's a chance to benefit
@ -152,7 +155,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// iterate over all blobs in index to find out which blobs are duplicates
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
idx.Each(ctx, func(blob restic.PackedBlob) {
err := idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
if ok {
@ -166,6 +169,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
usedBlobs[bh] = count
}
})
if err != nil {
return nil, nil, err
}
// Check if all used blobs have been found in index
missingBlobs := restic.NewBlobSet()
@ -188,14 +194,18 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
indexPack := make(map[restic.ID]packInfo)
// save computed pack header size
for pid, hdrSize := range pack.Size(ctx, idx, true) {
sz, err := pack.Size(ctx, idx, true)
if err != nil {
return nil, nil, err
}
for pid, hdrSize := range sz {
// initialize tpe with NumBlobTypes to indicate it's not set
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
}
hasDuplicates := false
// iterate over all blobs in index to generate packInfo
idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.Each(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID]
// Set blob type if not yet set
@ -240,6 +250,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack
indexPack[blob.PackID] = ip
})
if err != nil {
return nil, nil, err
}
// if duplicate blobs exist, those will be set to either "used" or "unused":
// - mark only one occurrence of duplicate blobs as used
@ -247,7 +260,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if hasDuplicates {
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
// skip non-duplicate, aka. normal blobs
@ -285,6 +298,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack
indexPack[blob.PackID] = ip
})
if err != nil {
return nil, nil, err
}
}
// Sanity check. If no duplicates exist, all blobs have value 1. After handling
@ -528,6 +544,9 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
printer.P("deleting unreferenced packs\n")
_ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer)
}
if ctx.Err() != nil {
return ctx.Err()
}
if len(plan.repackPacks) != 0 {
printer.P("repacking packs\n")
@ -578,6 +597,9 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
printer.P("removing %d old packs\n", len(plan.removePacks))
_ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer)
}
if ctx.Err() != nil {
return ctx.Err()
}
if plan.opts.UnsafeRecovery {
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer)

View File

@ -72,20 +72,15 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return wgCtx.Err()
}
}
return nil
return wgCtx.Err()
})
worker := func() error {
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
// a required blob couldn't be retrieved
return err
}
keepMutex.Lock()

View File

@ -54,7 +54,10 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
if err != nil {
return err
}
packSizeFromIndex = pack.Size(ctx, repo.Index(), false)
packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false)
if err != nil {
return err
}
}
printer.P("getting pack files to read...\n")

View File

@ -17,7 +17,7 @@ import (
func listBlobs(repo restic.Repository) restic.BlobSet {
blobs := restic.NewBlobSet()
repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
_ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
blobs.Insert(pb.BlobHandle)
})
return blobs

View File

@ -1,7 +1,6 @@
package repository
import (
"bufio"
"bytes"
"context"
"fmt"
@ -12,7 +11,6 @@ import (
"sort"
"sync"
"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd"
"github.com/restic/chunker"
"github.com/restic/restic/internal/backend"
@ -29,8 +27,6 @@ import (
"golang.org/x/sync/errgroup"
)
const MaxStreamBufferSize = 4 * 1024 * 1024
const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024
@ -704,15 +700,21 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
defer cancel()
invalidIndex := false
r.idx.Each(ctx, func(blob restic.PackedBlob) {
err := r.idx.Each(ctx, func(blob restic.PackedBlob) {
if blob.IsCompressed() {
invalidIndex = true
}
})
if err != nil {
return err
}
if invalidIndex {
return errors.New("index uses feature not supported by repository version 1")
}
}
if ctx.Err() != nil {
return ctx.Err()
}
// remove index files from the cache which have been removed in the repo
return r.prepareCache()
@ -960,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
}
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// Skip sections with more than 1MB unused blobs
const maxUnusedRange = 1 * 1024 * 1024
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it.
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
}
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
@ -984,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lowerIdx := 0
lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize
for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID)
}
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}
if split {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
@ -1000,10 +1019,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
}
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
dataStart := blobs[0].Offset
@ -1011,57 +1030,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return nil
data := make([]byte, int(dataEnd-dataStart))
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
_, cerr := io.ReadFull(rd, data)
return cerr
})
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
// the context is only still valid if handleBlobFn never returned an error
if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}
return errors.Wrap(err, "StreamPack")
}
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return errors.Wrap(err, "StreamPack")
}
// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}
type byteReader struct {
buf []byte
}
func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}
func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}
type PackBlobIterator struct {
packID restic.ID
rd *bufio.Reader
rd discardReader
currentOffset uint
blobs []restic.Blob
key *crypto.Key
dec *zstd.Decoder
buf []byte
decode []byte
}
@ -1073,7 +1143,7 @@ type PackBlobValue struct {
var ErrPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint,
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{
packID: packID,
@ -1108,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(b.buf)) < entry.Length {
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]
n, err := io.ReadFull(b.rd, b.buf)
buf, err := b.rd.ReadFull(int(entry.Length))
if err != nil {
debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
}
if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() {
@ -1131,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
}
// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():]
nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)

View File

@ -146,14 +146,14 @@ func TestStreamPack(t *testing.T) {
}
func testStreamPack(t *testing.T, version uint) {
// always use the same key for deterministic output
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
dec, err := zstd.NewReader(nil)
if err != nil {
t.Fatal(err)
panic(dec)
}
defer dec.Close()
// always use the same key for deterministic output
key := testKey(t)
blobSizes := []int{
5522811,
@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
loadCalls = 0
shortFirstLoad = test.shortFirstLoad
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
return err
}
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}
@ -449,3 +449,83 @@ func TestUnpackedVerification(t *testing.T) {
}
}
}
func testKey(t *testing.T) crypto.Key {
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
return key
}
func TestStreamPackFallback(t *testing.T) {
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
test := func(t *testing.T, failLoad bool) {
key := testKey(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
plaintext := rtest.Random(800, 42)
blobID := restic.Hash(plaintext)
blobs := []restic.Blob{
{
Length: uint(crypto.CiphertextLength(len(plaintext))),
Offset: 0,
BlobHandle: restic.BlobHandle{
ID: blobID,
Type: restic.DataBlob,
},
},
}
var loadPack backendLoadFn
if failLoad {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return errors.New("load error")
}
} else {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// just return an empty array to provoke an error
data := make([]byte, length)
return fn(bytes.NewReader(data))
}
}
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == blobID {
return plaintext, nil
}
return nil, errors.New("unknown blob")
}
blobOK := false
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
rtest.OK(t, err)
rtest.Equals(t, blobID, blob.ID)
rtest.Equals(t, plaintext, buf)
blobOK = true
return err
}
err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob)
rtest.OK(t, err)
rtest.Assert(t, blobOK, "blob failed to load")
}
t.Run("corrupted blob", func(t *testing.T) {
test(t, false)
})
// test fallback for failed pack loading
t.Run("failed load", func(t *testing.T) {
test(t, true)
})
}

View File

@ -370,13 +370,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err)
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{})
}
packEntries[pb.PackID][id] = struct{}{}
})
}))
return nil
})
if err != nil {

View File

@ -103,8 +103,8 @@ type MasterIndex interface {
Lookup(BlobHandle) []PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob))
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error