Compare commits

...

17 Commits

Author SHA1 Message Date
Michael Eischer fbe6f93d4b
Merge 1b9e89b478 into a7b5e09902 2024-04-24 21:52:09 +02:00
Michael Eischer a7b5e09902
Merge pull request #4753 from MichaelEischer/remove-cleanup-handlers
Replace cleanup handlers with context based command cancelation
2024-04-24 21:34:19 +02:00
Michael Eischer 3f9d50865d
Merge pull request #4776 from MichaelEischer/cleanup-backend-open
unify backend open and create
2024-04-24 21:24:27 +02:00
Michael Eischer 5f263752d7 init: also apply limiter for non-HTTP backend 2024-04-24 20:42:30 +02:00
Michael Eischer 484dbb1cf4 get rid of a few global variables 2024-04-22 22:39:33 +02:00
Michael Eischer 940a3159b5 let index.Each() and pack.Size() return error on canceled context
This forces a caller to actually check that the function did complete.
2024-04-22 22:39:32 +02:00
Michael Eischer 31624aeffd Improve command shutdown on context cancellation 2024-04-22 22:31:38 +02:00
Michael Eischer 910927670f mount: fix exit code on cancellation 2024-04-22 22:27:19 +02:00
Michael Eischer 6f2a4dea21 remove global shutdown hook 2024-04-22 22:27:19 +02:00
Michael Eischer 699ef5e9de debug: replace cleanup handler usage in profiling setup 2024-04-22 22:27:19 +02:00
Michael Eischer eb710a28e8 use standalone shutdown hook for readPasswordTerminal
move terminal restoration into readPasswordTerminal
2024-04-22 22:27:19 +02:00
Michael Eischer 86c7909f41 mount: use standalone shutdown hook via goroutine 2024-04-22 22:27:19 +02:00
Michael Eischer 93135dc705 lock: drop cleanup handler 2024-04-22 22:27:19 +02:00
Michael Eischer 21a7cb405c check: replace cleanup handler 2024-04-22 22:27:19 +02:00
Michael Eischer 1b9e89b478 guard http timeouts with a feature flag 2024-04-22 21:41:53 +02:00
Michael Eischer a95608522b backend: enforce backend HTTP requests make progress 2024-04-21 19:23:16 +02:00
Michael Eischer 6c6dceade3 global: unify backend open and create 2024-04-19 22:26:14 +02:00
47 changed files with 568 additions and 339 deletions

View File

@ -1,89 +1,41 @@
package main package main
import ( import (
"context"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
) )
var cleanupHandlers struct { func createGlobalContext() context.Context {
sync.Mutex ctx, cancel := context.WithCancel(context.Background())
list []func(code int) (int, error)
done bool ch := make(chan os.Signal, 1)
ch chan os.Signal go cleanupHandler(ch, cancel)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
return ctx
} }
func init() { // cleanupHandler handles the SIGINT and SIGTERM signals.
cleanupHandlers.ch = make(chan os.Signal, 1) func cleanupHandler(c <-chan os.Signal, cancel context.CancelFunc) {
go CleanupHandler(cleanupHandlers.ch) s := <-c
signal.Notify(cleanupHandlers.ch, syscall.SIGINT, syscall.SIGTERM) debug.Log("signal %v received, cleaning up", s)
} Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)
// AddCleanupHandler adds the function f to the list of cleanup handlers so if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
// that it is executed when all the cleanup handlers are run, e.g. when SIGINT _, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
// is received. _, _ = os.Stderr.WriteString(debug.DumpStacktrace())
func AddCleanupHandler(f func(code int) (int, error)) { _, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()
// reset the done flag for integration tests
cleanupHandlers.done = false
cleanupHandlers.list = append(cleanupHandlers.list, f)
}
// RunCleanupHandlers runs all registered cleanup handlers
func RunCleanupHandlers(code int) int {
cleanupHandlers.Lock()
defer cleanupHandlers.Unlock()
if cleanupHandlers.done {
return code
} }
cleanupHandlers.done = true
for _, f := range cleanupHandlers.list { cancel()
var err error
code, err = f(code)
if err != nil {
Warnf("error in cleanup handler: %v\n", err)
}
}
cleanupHandlers.list = nil
return code
} }
// CleanupHandler handles the SIGINT and SIGTERM signals. // Exit terminates the process with the given exit code.
func CleanupHandler(c <-chan os.Signal) {
for s := range c {
debug.Log("signal %v received, cleaning up", s)
Warnf("%ssignal %v received, cleaning up\n", clearLine(0), s)
if val, _ := os.LookupEnv("RESTIC_DEBUG_STACKTRACE_SIGINT"); val != "" {
_, _ = os.Stderr.WriteString("\n--- STACKTRACE START ---\n\n")
_, _ = os.Stderr.WriteString(debug.DumpStacktrace())
_, _ = os.Stderr.WriteString("\n--- STACKTRACE END ---\n")
}
code := 0
if s == syscall.SIGINT || s == syscall.SIGTERM {
code = 130
} else {
code = 1
}
Exit(code)
}
}
// Exit runs the cleanup handlers and then terminates the process with the
// given exit code.
func Exit(code int) { func Exit(code int) {
code = RunCleanupHandlers(code)
debug.Log("exiting with status code %d", code) debug.Log("exiting with status code %d", code)
os.Exit(code) os.Exit(code)
} }

View File

@ -199,10 +199,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
} }
cleanup := prepareCheckCache(opts, &gopts) cleanup := prepareCheckCache(opts, &gopts)
AddCleanupHandler(func(code int) (int, error) { defer cleanup()
cleanup()
return code, nil
})
if !gopts.NoLock { if !gopts.NoLock {
Verbosef("create exclusive lock for repository\n") Verbosef("create exclusive lock for repository\n")
@ -222,6 +219,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
Verbosef("load indexes\n") Verbosef("load indexes\n")
bar := newIndexProgress(gopts.Quiet, gopts.JSON) bar := newIndexProgress(gopts.Quiet, gopts.JSON)
hints, errs := chkr.LoadIndex(ctx, bar) hints, errs := chkr.LoadIndex(ctx, bar)
if ctx.Err() != nil {
return ctx.Err()
}
errorsFound := false errorsFound := false
suggestIndexRebuild := false suggestIndexRebuild := false
@ -283,6 +283,9 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
if orphanedPacks > 0 { if orphanedPacks > 0 {
Verbosef("%d additional files were found in the repo, which likely contain duplicate data.\nThis is non-critical, you can run `restic prune` to correct this.\n", orphanedPacks) Verbosef("%d additional files were found in the repo, which likely contain duplicate data.\nThis is non-critical, you can run `restic prune` to correct this.\n", orphanedPacks)
} }
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("check snapshots, trees and blobs\n") Verbosef("check snapshots, trees and blobs\n")
errChan = make(chan error) errChan = make(chan error)
@ -316,9 +319,16 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
// Must happen after `errChan` is read from in the above loop to avoid // Must happen after `errChan` is read from in the above loop to avoid
// deadlocking in the case of errors. // deadlocking in the case of errors.
wg.Wait() wg.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
if opts.CheckUnused { if opts.CheckUnused {
for _, id := range chkr.UnusedBlobs(ctx) { unused, err := chkr.UnusedBlobs(ctx)
if err != nil {
return err
}
for _, id := range unused {
Verbosef("unused blob %v\n", id) Verbosef("unused blob %v\n", id)
errorsFound = true errorsFound = true
} }
@ -395,10 +405,13 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
doReadData(packs) doReadData(packs)
} }
if ctx.Err() != nil {
return ctx.Err()
}
if errorsFound { if errorsFound {
return errors.Fatal("repository contains errors") return errors.Fatal("repository contains errors")
} }
Verbosef("no errors were found\n") Verbosef("no errors were found\n")
return nil return nil

View File

@ -53,7 +53,7 @@ func init() {
} }
func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []string) error { func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []string) error {
secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "destination") secondaryGopts, isFromRepo, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "destination")
if err != nil { if err != nil {
return err return err
} }
@ -103,6 +103,9 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
// also consider identical snapshot copies // also consider identical snapshot copies
dstSnapshotByOriginal[*sn.ID()] = append(dstSnapshotByOriginal[*sn.ID()], sn) dstSnapshotByOriginal[*sn.ID()] = append(dstSnapshotByOriginal[*sn.ID()], sn)
} }
if ctx.Err() != nil {
return ctx.Err()
}
// remember already processed trees across all snapshots // remember already processed trees across all snapshots
visitedTrees := restic.NewIDSet() visitedTrees := restic.NewIDSet()
@ -147,7 +150,7 @@ func runCopy(ctx context.Context, opts CopyOptions, gopts GlobalOptions, args []
} }
Verbosef("snapshot %s saved\n", newID.Str()) Verbosef("snapshot %s saved\n", newID.Str())
} }
return nil return ctx.Err()
} }
func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool { func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {

View File

@ -439,7 +439,10 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
if err != errAllPacksFound { if err != errAllPacksFound {
// try to resolve unknown pack ids from the index // try to resolve unknown pack ids from the index
packIDs = f.indexPacksToBlobs(ctx, packIDs) packIDs, err = f.indexPacksToBlobs(ctx, packIDs)
if err != nil {
return err
}
} }
if len(packIDs) > 0 { if len(packIDs) > 0 {
@ -456,13 +459,13 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
return nil return nil
} }
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) map[string]struct{} { func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) (map[string]struct{}, error) {
wctx, cancel := context.WithCancel(ctx) wctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
// remember which packs were found in the index // remember which packs were found in the index
indexPackIDs := make(map[string]struct{}) indexPackIDs := make(map[string]struct{})
f.repo.Index().Each(wctx, func(pb restic.PackedBlob) { err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String() idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries // keep entry in packIDs as Each() returns individual index entries
matchingID := false matchingID := false
@ -481,6 +484,9 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
indexPackIDs[idStr] = struct{}{} indexPackIDs[idStr] = struct{}{}
} }
}) })
if err != nil {
return nil, err
}
for id := range indexPackIDs { for id := range indexPackIDs {
delete(packIDs, id) delete(packIDs, id)
@ -493,7 +499,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
} }
Warnf("some pack files are missing from the repository, getting their blobs from the repository index: %v\n\n", list) Warnf("some pack files are missing from the repository, getting their blobs from the repository index: %v\n\n", list)
} }
return packIDs return packIDs, nil
} }
func (f *Finder) findObjectPack(id string, t restic.BlobType) { func (f *Finder) findObjectPack(id string, t restic.BlobType) {
@ -608,6 +614,9 @@ func runFind(ctx context.Context, opts FindOptions, gopts GlobalOptions, args []
for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, opts.Snapshots) { for sn := range FindFilteredSnapshots(ctx, snapshotLister, repo, &opts.SnapshotFilter, opts.Snapshots) {
filteredSnapshots = append(filteredSnapshots, sn) filteredSnapshots = append(filteredSnapshots, sn)
} }
if ctx.Err() != nil {
return ctx.Err()
}
sort.Slice(filteredSnapshots, func(i, j int) bool { sort.Slice(filteredSnapshots, func(i, j int) bool {
return filteredSnapshots[i].Time.Before(filteredSnapshots[j].Time) return filteredSnapshots[i].Time.Before(filteredSnapshots[j].Time)

View File

@ -188,6 +188,9 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) { for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
snapshots = append(snapshots, sn) snapshots = append(snapshots, sn)
} }
if ctx.Err() != nil {
return ctx.Err()
}
var jsonGroups []*ForgetGroup var jsonGroups []*ForgetGroup
@ -270,6 +273,10 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
} }
} }
if ctx.Err() != nil {
return ctx.Err()
}
if len(removeSnIDs) > 0 { if len(removeSnIDs) > 0 {
if !opts.DryRun { if !opts.DryRun {
bar := printer.NewCounter("files deleted") bar := printer.NewCounter("files deleted")

View File

@ -80,7 +80,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []
return err return err
} }
gopts.password, err = ReadPasswordTwice(gopts, gopts.password, err = ReadPasswordTwice(ctx, gopts,
"enter password for new repository: ", "enter password for new repository: ",
"enter password again: ") "enter password again: ")
if err != nil { if err != nil {
@ -131,7 +131,7 @@ func runInit(ctx context.Context, opts InitOptions, gopts GlobalOptions, args []
func maybeReadChunkerPolynomial(ctx context.Context, opts InitOptions, gopts GlobalOptions) (*chunker.Pol, error) { func maybeReadChunkerPolynomial(ctx context.Context, opts InitOptions, gopts GlobalOptions) (*chunker.Pol, error) {
if opts.CopyChunkerParameters { if opts.CopyChunkerParameters {
otherGopts, _, err := fillSecondaryGlobalOpts(opts.secondaryRepoOptions, gopts, "secondary") otherGopts, _, err := fillSecondaryGlobalOpts(ctx, opts.secondaryRepoOptions, gopts, "secondary")
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -60,7 +60,7 @@ func runKeyAdd(ctx context.Context, gopts GlobalOptions, opts KeyAddOptions, arg
} }
func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyAddOptions) error { func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyAddOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile) pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil { if err != nil {
return err return err
} }
@ -83,7 +83,7 @@ func addKey(ctx context.Context, repo *repository.Repository, gopts GlobalOption
// testKeyNewPassword is used to set a new password during integration testing. // testKeyNewPassword is used to set a new password during integration testing.
var testKeyNewPassword string var testKeyNewPassword string
func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error) { func getNewPassword(ctx context.Context, gopts GlobalOptions, newPasswordFile string) (string, error) {
if testKeyNewPassword != "" { if testKeyNewPassword != "" {
return testKeyNewPassword, nil return testKeyNewPassword, nil
} }
@ -97,7 +97,7 @@ func getNewPassword(gopts GlobalOptions, newPasswordFile string) (string, error)
newopts := gopts newopts := gopts
newopts.password = "" newopts.password = ""
return ReadPasswordTwice(newopts, return ReadPasswordTwice(ctx, newopts,
"enter new password: ", "enter new password: ",
"enter password again: ") "enter password again: ")
} }

View File

@ -57,7 +57,7 @@ func runKeyPasswd(ctx context.Context, gopts GlobalOptions, opts KeyPasswdOption
} }
func changePassword(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyPasswdOptions) error { func changePassword(ctx context.Context, repo *repository.Repository, gopts GlobalOptions, opts KeyPasswdOptions) error {
pw, err := getNewPassword(gopts, opts.NewPasswordFile) pw, err := getNewPassword(ctx, gopts, opts.NewPasswordFile)
if err != nil { if err != nil {
return err return err
} }

View File

@ -59,10 +59,9 @@ func runList(ctx context.Context, gopts GlobalOptions, args []string) error {
if err != nil { if err != nil {
return err return err
} }
idx.Each(ctx, func(blobs restic.PackedBlob) { return idx.Each(ctx, func(blobs restic.PackedBlob) {
Printf("%v %v\n", blobs.Type, blobs.ID) Printf("%v %v\n", blobs.Type, blobs.ID)
}) })
return nil
}) })
default: default:
return errors.Fatal("invalid type") return errors.Fatal("invalid type")

View File

@ -152,28 +152,15 @@ func runMount(ctx context.Context, opts MountOptions, gopts GlobalOptions, args
} }
} }
AddCleanupHandler(func(code int) (int, error) { systemFuse.Debug = func(msg interface{}) {
debug.Log("running umount cleanup handler for mount at %v", mountpoint) debug.Log("fuse: %v", msg)
err := umount(mountpoint) }
if err != nil {
Warnf("unable to umount (maybe already umounted or still in use?): %v\n", err)
}
// replace error code of sigint
if code == 130 {
code = 0
}
return code, nil
})
c, err := systemFuse.Mount(mountpoint, mountOptions...) c, err := systemFuse.Mount(mountpoint, mountOptions...)
if err != nil { if err != nil {
return err return err
} }
systemFuse.Debug = func(msg interface{}) {
debug.Log("fuse: %v", msg)
}
cfg := fuse.Config{ cfg := fuse.Config{
OwnerIsRoot: opts.OwnerRoot, OwnerIsRoot: opts.OwnerRoot,
Filter: opts.SnapshotFilter, Filter: opts.SnapshotFilter,
@ -187,15 +174,26 @@ func runMount(ctx context.Context, opts MountOptions, gopts GlobalOptions, args
Printf("When finished, quit with Ctrl-c here or umount the mountpoint.\n") Printf("When finished, quit with Ctrl-c here or umount the mountpoint.\n")
debug.Log("serving mount at %v", mountpoint) debug.Log("serving mount at %v", mountpoint)
err = fs.Serve(c, root)
if err != nil { done := make(chan struct{})
return err
go func() {
defer close(done)
err = fs.Serve(c, root)
}()
select {
case <-ctx.Done():
debug.Log("running umount cleanup handler for mount at %v", mountpoint)
err := systemFuse.Unmount(mountpoint)
if err != nil {
Warnf("unable to umount (maybe already umounted or still in use?): %v\n", err)
}
return ErrOK
case <-done:
// clean shutdown, nothing to do
} }
<-c.Ready return err
return c.MountError
}
func umount(mountpoint string) error {
return systemFuse.Unmount(mountpoint)
} }

View File

@ -12,6 +12,7 @@ import (
"testing" "testing"
"time" "time"
systemFuse "github.com/anacrolix/fuse"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
) )
@ -65,7 +66,7 @@ func testRunMount(t testing.TB, gopts GlobalOptions, dir string, wg *sync.WaitGr
func testRunUmount(t testing.TB, dir string) { func testRunUmount(t testing.TB, dir string) {
var err error var err error
for i := 0; i < mountWait; i++ { for i := 0; i < mountWait; i++ {
if err = umount(dir); err == nil { if err = systemFuse.Unmount(dir); err == nil {
t.Logf("directory %v umounted", dir) t.Logf("directory %v umounted", dir)
return return
} }

View File

@ -197,6 +197,9 @@ func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOption
if err != nil { if err != nil {
return err return err
} }
if ctx.Err() != nil {
return ctx.Err()
}
if popts.DryRun { if popts.DryRun {
printer.P("\nWould have made the following changes:") printer.P("\nWould have made the following changes:")

View File

@ -61,16 +61,22 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
// tree. If it is not referenced, we have a root tree. // tree. If it is not referenced, we have a root tree.
trees := make(map[restic.ID]bool) trees := make(map[restic.ID]bool)
repo.Index().Each(ctx, func(blob restic.PackedBlob) { err = repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if blob.Type == restic.TreeBlob { if blob.Type == restic.TreeBlob {
trees[blob.Blob.ID] = false trees[blob.Blob.ID] = false
} }
}) })
if err != nil {
return err
}
Verbosef("load %d trees\n", len(trees)) Verbosef("load %d trees\n", len(trees))
bar = newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded") bar = newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded")
for id := range trees { for id := range trees {
tree, err := restic.LoadTree(ctx, repo, id) tree, err := restic.LoadTree(ctx, repo, id)
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil { if err != nil {
Warnf("unable to load tree %v: %v\n", id.Str(), err) Warnf("unable to load tree %v: %v\n", id.Str(), err)
continue continue

View File

@ -145,6 +145,9 @@ func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOpt
changedCount++ changedCount++
} }
} }
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("\n") Verbosef("\n")
if changedCount == 0 { if changedCount == 0 {

View File

@ -294,6 +294,9 @@ func runRewrite(ctx context.Context, opts RewriteOptions, gopts GlobalOptions, a
changedCount++ changedCount++
} }
} }
if ctx.Err() != nil {
return ctx.Err()
}
Verbosef("\n") Verbosef("\n")
if changedCount == 0 { if changedCount == 0 {

View File

@ -69,6 +69,9 @@ func runSnapshots(ctx context.Context, opts SnapshotOptions, gopts GlobalOptions
for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) { for sn := range FindFilteredSnapshots(ctx, repo, repo, &opts.SnapshotFilter, args) {
snapshots = append(snapshots, sn) snapshots = append(snapshots, sn)
} }
if ctx.Err() != nil {
return ctx.Err()
}
snapshotGroups, grouped, err := restic.GroupSnapshots(snapshots, opts.GroupBy) snapshotGroups, grouped, err := restic.GroupSnapshots(snapshots, opts.GroupBy)
if err != nil { if err != nil {
return err return err

View File

@ -117,9 +117,8 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args
return fmt.Errorf("error walking snapshot: %v", err) return fmt.Errorf("error walking snapshot: %v", err)
} }
} }
if ctx.Err() != nil {
if err != nil { return ctx.Err()
return err
} }
if opts.countMode == countModeRawData { if opts.countMode == countModeRawData {
@ -352,7 +351,10 @@ func statsDebug(ctx context.Context, repo restic.Repository) error {
Warnf("File Type: %v\n%v\n", t, hist) Warnf("File Type: %v\n%v\n", t, hist)
} }
hist := statsDebugBlobs(ctx, repo) hist, err := statsDebugBlobs(ctx, repo)
if err != nil {
return err
}
for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} { for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} {
Warnf("Blob Type: %v\n%v\n\n", t, hist[t]) Warnf("Blob Type: %v\n%v\n\n", t, hist[t])
} }
@ -370,17 +372,17 @@ func statsDebugFileType(ctx context.Context, repo restic.Lister, tpe restic.File
return hist, err return hist, err
} }
func statsDebugBlobs(ctx context.Context, repo restic.Repository) [restic.NumBlobTypes]*sizeHistogram { func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBlobTypes]*sizeHistogram, error) {
var hist [restic.NumBlobTypes]*sizeHistogram var hist [restic.NumBlobTypes]*sizeHistogram
for i := 0; i < len(hist); i++ { for i := 0; i < len(hist); i++ {
hist[i] = newSizeHistogram(2 * chunker.MaxSize) hist[i] = newSizeHistogram(2 * chunker.MaxSize)
} }
repo.Index().Each(ctx, func(pb restic.PackedBlob) { err := repo.Index().Each(ctx, func(pb restic.PackedBlob) {
hist[pb.Type].Add(uint64(pb.Length)) hist[pb.Type].Add(uint64(pb.Length))
}) })
return hist return hist, err
} }
type sizeClass struct { type sizeClass struct {

View File

@ -122,6 +122,9 @@ func runTag(ctx context.Context, opts TagOptions, gopts GlobalOptions, args []st
changeCnt++ changeCnt++
} }
} }
if ctx.Err() != nil {
return ctx.Err()
}
if changeCnt == 0 { if changeCnt == 0 {
Verbosef("no snapshots were modified\n") Verbosef("no snapshots were modified\n")
} else { } else {

View File

@ -43,7 +43,7 @@ import (
"golang.org/x/term" "golang.org/x/term"
) )
var version = "0.16.4-dev (compiled manually)" const version = "0.16.4-dev (compiled manually)"
// TimeFormat is the format used for all timestamps printed by restic. // TimeFormat is the format used for all timestamps printed by restic.
const TimeFormat = "2006-01-02 15:04:05" const TimeFormat = "2006-01-02 15:04:05"
@ -96,9 +96,6 @@ var globalOptions = GlobalOptions{
stderr: os.Stderr, stderr: os.Stderr,
} }
var isReadingPassword bool
var internalGlobalCtx context.Context
func init() { func init() {
backends := location.NewRegistry() backends := location.NewRegistry()
backends.Register(azure.NewFactory()) backends.Register(azure.NewFactory())
@ -112,15 +109,6 @@ func init() {
backends.Register(swift.NewFactory()) backends.Register(swift.NewFactory())
globalOptions.backends = backends globalOptions.backends = backends
var cancel context.CancelFunc
internalGlobalCtx, cancel = context.WithCancel(context.Background())
AddCleanupHandler(func(code int) (int, error) {
// Must be called before the unlock cleanup handler to ensure that the latter is
// not blocked due to limited number of backend connections, see #1434
cancel()
return code, nil
})
f := cmdRoot.PersistentFlags() f := cmdRoot.PersistentFlags()
f.StringVarP(&globalOptions.Repo, "repo", "r", "", "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)") f.StringVarP(&globalOptions.Repo, "repo", "r", "", "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)")
f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", "", "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)") f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", "", "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)")
@ -165,8 +153,6 @@ func init() {
// parse target pack size from env, on error the default value will be used // parse target pack size from env, on error the default value will be used
targetPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_PACK_SIZE"), 10, 32) targetPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_PACK_SIZE"), 10, 32)
globalOptions.PackSize = uint(targetPackSize) globalOptions.PackSize = uint(targetPackSize)
restoreTerminal()
} }
func stdinIsTerminal() bool { func stdinIsTerminal() bool {
@ -191,40 +177,6 @@ func stdoutTerminalWidth() int {
return w return w
} }
// restoreTerminal installs a cleanup handler that restores the previous
// terminal state on exit. This handler is only intended to restore the
// terminal configuration if restic exits after receiving a signal. A regular
// program execution must revert changes to the terminal configuration itself.
// The terminal configuration is only restored while reading a password.
func restoreTerminal() {
if !term.IsTerminal(int(os.Stdout.Fd())) {
return
}
fd := int(os.Stdout.Fd())
state, err := term.GetState(fd)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to get terminal state: %v\n", err)
return
}
AddCleanupHandler(func(code int) (int, error) {
// Restoring the terminal configuration while restic runs in the
// background, causes restic to get stopped on unix systems with
// a SIGTTOU signal. Thus only restore the terminal settings if
// they might have been modified, which is the case while reading
// a password.
if !isReadingPassword {
return code, nil
}
err := term.Restore(fd, state)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to restore terminal state: %v\n", err)
}
return code, err
})
}
// ClearLine creates a platform dependent string to clear the current // ClearLine creates a platform dependent string to clear the current
// line, so it can be overwritten. // line, so it can be overwritten.
// //
@ -333,24 +285,48 @@ func readPassword(in io.Reader) (password string, err error) {
// readPasswordTerminal reads the password from the given reader which must be a // readPasswordTerminal reads the password from the given reader which must be a
// tty. Prompt is printed on the writer out before attempting to read the // tty. Prompt is printed on the writer out before attempting to read the
// password. // password. If the context is canceled, the function leaks the password reading
func readPasswordTerminal(in *os.File, out io.Writer, prompt string) (password string, err error) { // goroutine.
fmt.Fprint(out, prompt) func readPasswordTerminal(ctx context.Context, in *os.File, out *os.File, prompt string) (password string, err error) {
isReadingPassword = true fd := int(out.Fd())
buf, err := term.ReadPassword(int(in.Fd())) state, err := term.GetState(fd)
isReadingPassword = false if err != nil {
fmt.Fprintln(out) fmt.Fprintf(os.Stderr, "unable to get terminal state: %v\n", err)
return "", err
}
done := make(chan struct{})
var buf []byte
go func() {
defer close(done)
fmt.Fprint(out, prompt)
buf, err = term.ReadPassword(int(in.Fd()))
fmt.Fprintln(out)
}()
select {
case <-ctx.Done():
err := term.Restore(fd, state)
if err != nil {
fmt.Fprintf(os.Stderr, "unable to restore terminal state: %v\n", err)
}
return "", ctx.Err()
case <-done:
// clean shutdown, nothing to do
}
if err != nil { if err != nil {
return "", errors.Wrap(err, "ReadPassword") return "", errors.Wrap(err, "ReadPassword")
} }
password = string(buf) return string(buf), nil
return password, nil
} }
// ReadPassword reads the password from a password file, the environment // ReadPassword reads the password from a password file, the environment
// variable RESTIC_PASSWORD or prompts the user. // variable RESTIC_PASSWORD or prompts the user. If the context is canceled,
func ReadPassword(opts GlobalOptions, prompt string) (string, error) { // the function leaks the password reading goroutine.
func ReadPassword(ctx context.Context, opts GlobalOptions, prompt string) (string, error) {
if opts.password != "" { if opts.password != "" {
return opts.password, nil return opts.password, nil
} }
@ -361,7 +337,7 @@ func ReadPassword(opts GlobalOptions, prompt string) (string, error) {
) )
if stdinIsTerminal() { if stdinIsTerminal() {
password, err = readPasswordTerminal(os.Stdin, os.Stderr, prompt) password, err = readPasswordTerminal(ctx, os.Stdin, os.Stderr, prompt)
} else { } else {
password, err = readPassword(os.Stdin) password, err = readPassword(os.Stdin)
Verbosef("reading repository password from stdin\n") Verbosef("reading repository password from stdin\n")
@ -379,14 +355,15 @@ func ReadPassword(opts GlobalOptions, prompt string) (string, error) {
} }
// ReadPasswordTwice calls ReadPassword two times and returns an error when the // ReadPasswordTwice calls ReadPassword two times and returns an error when the
// passwords don't match. // passwords don't match. If the context is canceled, the function leaks the
func ReadPasswordTwice(gopts GlobalOptions, prompt1, prompt2 string) (string, error) { // password reading goroutine.
pw1, err := ReadPassword(gopts, prompt1) func ReadPasswordTwice(ctx context.Context, gopts GlobalOptions, prompt1, prompt2 string) (string, error) {
pw1, err := ReadPassword(ctx, gopts, prompt1)
if err != nil { if err != nil {
return "", err return "", err
} }
if stdinIsTerminal() { if stdinIsTerminal() {
pw2, err := ReadPassword(gopts, prompt2) pw2, err := ReadPassword(ctx, gopts, prompt2)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -469,7 +446,10 @@ func OpenRepository(ctx context.Context, opts GlobalOptions) (*repository.Reposi
} }
for ; passwordTriesLeft > 0; passwordTriesLeft-- { for ; passwordTriesLeft > 0; passwordTriesLeft-- {
opts.password, err = ReadPassword(opts, "enter password for repository: ") opts.password, err = ReadPassword(ctx, opts, "enter password for repository: ")
if ctx.Err() != nil {
return nil, ctx.Err()
}
if err != nil && passwordTriesLeft > 1 { if err != nil && passwordTriesLeft > 1 {
opts.password = "" opts.password = ""
fmt.Printf("%s. Try again\n", err) fmt.Printf("%s. Try again\n", err)
@ -570,16 +550,13 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
return cfg, nil return cfg, nil
} }
// Open the backend specified by a location config. func innerOpen(ctx context.Context, s string, gopts GlobalOptions, opts options.Options, create bool) (backend.Backend, error) {
func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
debug.Log("parsing location %v", location.StripPassword(gopts.backends, s)) debug.Log("parsing location %v", location.StripPassword(gopts.backends, s))
loc, err := location.Parse(gopts.backends, s) loc, err := location.Parse(gopts.backends, s)
if err != nil { if err != nil {
return nil, errors.Fatalf("parsing repository location failed: %v", err) return nil, errors.Fatalf("parsing repository location failed: %v", err)
} }
var be backend.Backend
cfg, err := parseConfig(loc, opts) cfg, err := parseConfig(loc, opts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -599,7 +576,13 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme) return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
} }
be, err = factory.Open(ctx, cfg, rt, lim) var be backend.Backend
if create {
be, err = factory.Create(ctx, cfg, rt, lim)
} else {
be, err = factory.Open(ctx, cfg, rt, lim)
}
if err != nil { if err != nil {
return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(gopts.backends, s), err) return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(gopts.backends, s), err)
} }
@ -615,6 +598,17 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
} }
} }
return be, nil
}
// Open the backend specified by a location config.
func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
be, err := innerOpen(ctx, s, gopts, opts, false)
if err != nil {
return nil, err
}
// check if config is there // check if config is there
fi, err := be.Stat(ctx, backend.Handle{Type: restic.ConfigFile}) fi, err := be.Stat(ctx, backend.Handle{Type: restic.ConfigFile})
if err != nil { if err != nil {
@ -630,31 +624,5 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio
// Create the backend specified by URI. // Create the backend specified by URI.
func create(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) { func create(ctx context.Context, s string, gopts GlobalOptions, opts options.Options) (backend.Backend, error) {
debug.Log("parsing location %v", location.StripPassword(gopts.backends, s)) return innerOpen(ctx, s, gopts, opts, true)
loc, err := location.Parse(gopts.backends, s)
if err != nil {
return nil, err
}
cfg, err := parseConfig(loc, opts)
if err != nil {
return nil, err
}
rt, err := backend.Transport(globalOptions.TransportOptions)
if err != nil {
return nil, errors.Fatal(err.Error())
}
factory := gopts.backends.Lookup(loc.Scheme)
if factory == nil {
return nil, errors.Fatalf("invalid backend: %q", loc.Scheme)
}
be, err := factory.Create(ctx, cfg, rt, nil)
if err != nil {
return nil, err
}
return logger.New(sema.NewBackend(be)), nil
} }

View File

@ -15,23 +15,28 @@ import (
"github.com/pkg/profile" "github.com/pkg/profile"
) )
var ( type ProfileOptions struct {
listenProfile string listen string
memProfilePath string memPath string
cpuProfilePath string cpuPath string
traceProfilePath string tracePath string
blockProfilePath string blockPath string
insecure bool insecure bool
) }
var profileOpts ProfileOptions
var prof interface {
Stop()
}
func init() { func init() {
f := cmdRoot.PersistentFlags() f := cmdRoot.PersistentFlags()
f.StringVar(&listenProfile, "listen-profile", "", "listen on this `address:port` for memory profiling") f.StringVar(&profileOpts.listen, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`") f.StringVar(&profileOpts.memPath, "mem-profile", "", "write memory profile to `dir`")
f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`") f.StringVar(&profileOpts.cpuPath, "cpu-profile", "", "write cpu profile to `dir`")
f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`") f.StringVar(&profileOpts.tracePath, "trace-profile", "", "write trace to `dir`")
f.StringVar(&blockProfilePath, "block-profile", "", "write block profile to `dir`") f.StringVar(&profileOpts.blockPath, "block-profile", "", "write block profile to `dir`")
f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings") f.BoolVar(&profileOpts.insecure, "insecure-kdf", false, "use insecure KDF settings")
} }
type fakeTestingTB struct{} type fakeTestingTB struct{}
@ -41,10 +46,10 @@ func (fakeTestingTB) Logf(msg string, args ...interface{}) {
} }
func runDebug() error { func runDebug() error {
if listenProfile != "" { if profileOpts.listen != "" {
fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", listenProfile) fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", profileOpts.listen)
go func() { go func() {
err := http.ListenAndServe(listenProfile, nil) err := http.ListenAndServe(profileOpts.listen, nil)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err) fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err)
} }
@ -52,16 +57,16 @@ func runDebug() error {
} }
profilesEnabled := 0 profilesEnabled := 0
if memProfilePath != "" { if profileOpts.memPath != "" {
profilesEnabled++ profilesEnabled++
} }
if cpuProfilePath != "" { if profileOpts.cpuPath != "" {
profilesEnabled++ profilesEnabled++
} }
if traceProfilePath != "" { if profileOpts.tracePath != "" {
profilesEnabled++ profilesEnabled++
} }
if blockProfilePath != "" { if profileOpts.blockPath != "" {
profilesEnabled++ profilesEnabled++
} }
@ -69,30 +74,25 @@ func runDebug() error {
return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time") return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time")
} }
var prof interface { if profileOpts.memPath != "" {
Stop() prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.MemProfile, profile.ProfilePath(profileOpts.memPath))
} else if profileOpts.cpuPath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(profileOpts.cpuPath))
} else if profileOpts.tracePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(profileOpts.tracePath))
} else if profileOpts.blockPath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(profileOpts.blockPath))
} }
if memProfilePath != "" { if profileOpts.insecure {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.MemProfile, profile.ProfilePath(memProfilePath))
} else if cpuProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath))
} else if traceProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath))
} else if blockProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(blockProfilePath))
}
if prof != nil {
AddCleanupHandler(func(code int) (int, error) {
prof.Stop()
return code, nil
})
}
if insecure {
repository.TestUseLowSecurityKDFParameters(fakeTestingTB{}) repository.TestUseLowSecurityKDFParameters(fakeTestingTB{})
} }
return nil return nil
} }
func stopDebug() {
if prof != nil {
prof.Stop()
}
}

View File

@ -5,3 +5,6 @@ package main
// runDebug is a noop without the debug tag. // runDebug is a noop without the debug tag.
func runDebug() error { return nil } func runDebug() error { return nil }
// stopDebug is a noop without the debug tag.
func stopDebug() {}

View File

@ -252,11 +252,11 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
rtest.OK(t, r.LoadIndex(ctx, nil)) rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet() treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) { rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob { if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID) treePacks.Insert(pb.PackID)
} }
}) }))
return treePacks return treePacks
} }
@ -280,11 +280,11 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
rtest.OK(t, r.LoadIndex(ctx, nil)) rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet() treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) { rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob { if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID) treePacks.Insert(pb.PackID)
} }
}) }))
// remove all packs containing data blobs // remove all packs containing data blobs
rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {

View File

@ -21,18 +21,11 @@ func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun boo
Verbosef("%s", msg) Verbosef("%s", msg)
} }
}, Warnf) }, Warnf)
unlock = lock.Unlock
// make sure that a repository is unlocked properly and after cancel() was
// called by the cleanup handler in global.go
AddCleanupHandler(func(code int) (int, error) {
lock.Unlock()
return code, nil
})
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
unlock = lock.Unlock
} else { } else {
repo.SetDryRun() repo.SetDryRun()
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"context"
"fmt" "fmt"
"log" "log"
"os" "os"
@ -24,6 +25,8 @@ func init() {
_, _ = maxprocs.Set() _, _ = maxprocs.Set()
} }
var ErrOK = errors.New("ok")
// cmdRoot is the base command when no other command has been specified. // cmdRoot is the base command when no other command has been specified.
var cmdRoot = &cobra.Command{ var cmdRoot = &cobra.Command{
Use: "restic", Use: "restic",
@ -74,6 +77,9 @@ The full documentation can be found at https://restic.readthedocs.io/ .
// enabled) // enabled)
return runDebug() return runDebug()
}, },
PersistentPostRun: func(_ *cobra.Command, _ []string) {
stopDebug()
},
} }
// Distinguish commands that need the password from those that work without, // Distinguish commands that need the password from those that work without,
@ -88,8 +94,6 @@ func needsPassword(cmd string) bool {
} }
} }
var logBuffer = bytes.NewBuffer(nil)
func tweakGoGC() { func tweakGoGC() {
// lower GOGC from 100 to 50, unless it was manually overwritten by the user // lower GOGC from 100 to 50, unless it was manually overwritten by the user
oldValue := godebug.SetGCPercent(50) oldValue := godebug.SetGCPercent(50)
@ -102,6 +106,7 @@ func main() {
tweakGoGC() tweakGoGC()
// install custom global logger into a buffer, if an error occurs // install custom global logger into a buffer, if an error occurs
// we can show the logs // we can show the logs
logBuffer := bytes.NewBuffer(nil)
log.SetOutput(logBuffer) log.SetOutput(logBuffer)
err := feature.Flag.Apply(os.Getenv("RESTIC_FEATURES"), func(s string) { err := feature.Flag.Apply(os.Getenv("RESTIC_FEATURES"), func(s string) {
@ -115,7 +120,16 @@ func main() {
debug.Log("main %#v", os.Args) debug.Log("main %#v", os.Args)
debug.Log("restic %s compiled with %v on %v/%v", debug.Log("restic %s compiled with %v on %v/%v",
version, runtime.Version(), runtime.GOOS, runtime.GOARCH) version, runtime.Version(), runtime.GOOS, runtime.GOARCH)
err = cmdRoot.ExecuteContext(internalGlobalCtx)
ctx := createGlobalContext()
err = cmdRoot.ExecuteContext(ctx)
if err == nil {
err = ctx.Err()
} else if err == ErrOK {
// ErrOK overwrites context cancelation errors
err = nil
}
switch { switch {
case restic.IsAlreadyLocked(err): case restic.IsAlreadyLocked(err):
@ -137,11 +151,13 @@ func main() {
} }
var exitCode int var exitCode int
switch err { switch {
case nil: case err == nil:
exitCode = 0 exitCode = 0
case ErrInvalidSourceData: case err == ErrInvalidSourceData:
exitCode = 3 exitCode = 3
case errors.Is(err, context.Canceled):
exitCode = 130
default: default:
exitCode = 1 exitCode = 1
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"os" "os"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
@ -56,7 +57,7 @@ func initSecondaryRepoOptions(f *pflag.FlagSet, opts *secondaryRepoOptions, repo
opts.PasswordCommand = os.Getenv("RESTIC_FROM_PASSWORD_COMMAND") opts.PasswordCommand = os.Getenv("RESTIC_FROM_PASSWORD_COMMAND")
} }
func fillSecondaryGlobalOpts(opts secondaryRepoOptions, gopts GlobalOptions, repoPrefix string) (GlobalOptions, bool, error) { func fillSecondaryGlobalOpts(ctx context.Context, opts secondaryRepoOptions, gopts GlobalOptions, repoPrefix string) (GlobalOptions, bool, error) {
if opts.Repo == "" && opts.RepositoryFile == "" && opts.LegacyRepo == "" && opts.LegacyRepositoryFile == "" { if opts.Repo == "" && opts.RepositoryFile == "" && opts.LegacyRepo == "" && opts.LegacyRepositoryFile == "" {
return GlobalOptions{}, false, errors.Fatal("Please specify a source repository location (--from-repo or --from-repository-file)") return GlobalOptions{}, false, errors.Fatal("Please specify a source repository location (--from-repo or --from-repository-file)")
} }
@ -109,7 +110,7 @@ func fillSecondaryGlobalOpts(opts secondaryRepoOptions, gopts GlobalOptions, rep
return GlobalOptions{}, false, err return GlobalOptions{}, false, err
} }
} }
dstGopts.password, err = ReadPassword(dstGopts, "enter password for "+repoPrefix+" repository: ") dstGopts.password, err = ReadPassword(ctx, dstGopts, "enter password for "+repoPrefix+" repository: ")
if err != nil { if err != nil {
return GlobalOptions{}, false, err return GlobalOptions{}, false, err
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"context"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -170,7 +171,7 @@ func TestFillSecondaryGlobalOpts(t *testing.T) {
// Test all valid cases // Test all valid cases
for _, testCase := range validSecondaryRepoTestCases { for _, testCase := range validSecondaryRepoTestCases {
DstGOpts, isFromRepo, err := fillSecondaryGlobalOpts(testCase.Opts, gOpts, "destination") DstGOpts, isFromRepo, err := fillSecondaryGlobalOpts(context.TODO(), testCase.Opts, gOpts, "destination")
rtest.OK(t, err) rtest.OK(t, err)
rtest.Equals(t, DstGOpts, testCase.DstGOpts) rtest.Equals(t, DstGOpts, testCase.DstGOpts)
rtest.Equals(t, isFromRepo, testCase.FromRepo) rtest.Equals(t, isFromRepo, testCase.FromRepo)
@ -178,7 +179,7 @@ func TestFillSecondaryGlobalOpts(t *testing.T) {
// Test all invalid cases // Test all invalid cases
for _, testCase := range invalidSecondaryRepoTestCases { for _, testCase := range invalidSecondaryRepoTestCases {
_, _, err := fillSecondaryGlobalOpts(testCase.Opts, gOpts, "destination") _, _, err := fillSecondaryGlobalOpts(context.TODO(), testCase.Opts, gOpts, "destination")
rtest.Assert(t, err != nil, "Expected error, but function did not return an error") rtest.Assert(t, err != nil, "Expected error, but function did not return an error")
} }
} }

View File

@ -303,7 +303,7 @@ func generateFiles() {
} }
} }
var versionPattern = `var version = ".*"` var versionPattern = `const version = ".*"`
const versionCodeFile = "cmd/restic/global.go" const versionCodeFile = "cmd/restic/global.go"
@ -313,7 +313,7 @@ func updateVersion() {
die("unable to write version to file: %v", err) die("unable to write version to file: %v", err)
} }
newVersion := fmt.Sprintf("var version = %q", opts.Version) newVersion := fmt.Sprintf("const version = %q", opts.Version)
replace(versionCodeFile, versionPattern, newVersion) replace(versionCodeFile, versionPattern, newVersion)
if len(uncommittedChanges("VERSION")) > 0 || len(uncommittedChanges(versionCodeFile)) > 0 { if len(uncommittedChanges("VERSION")) > 0 || len(uncommittedChanges(versionCodeFile)) > 0 {
@ -323,7 +323,7 @@ func updateVersion() {
} }
func updateVersionDev() { func updateVersionDev() {
newVersion := fmt.Sprintf(`var version = "%s-dev (compiled manually)"`, opts.Version) newVersion := fmt.Sprintf(`const version = "%s-dev (compiled manually)"`, opts.Version)
replace(versionCodeFile, versionPattern, newVersion) replace(versionCodeFile, versionPattern, newVersion)
msg("committing cmd/restic/global.go with dev version") msg("committing cmd/restic/global.go with dev version")

View File

@ -380,6 +380,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult {
return res return res
} }
case <-ctx.Done(): case <-ctx.Done():
return futureNodeResult{err: ctx.Err()}
} }
return futureNodeResult{err: errors.Errorf("no result")} return futureNodeResult{err: errors.Errorf("no result")}
} }

View File

@ -90,6 +90,10 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
// return the error if it wasn't ignored // return the error if it wasn't ignored
if fnr.err != nil { if fnr.err != nil {
debug.Log("err for %v: %v", fnr.snPath, fnr.err) debug.Log("err for %v: %v", fnr.snPath, fnr.err)
if fnr.err == context.Canceled {
return nil, stats, fnr.err
}
fnr.err = s.errFn(fnr.target, fnr.err) fnr.err = s.errFn(fnr.target, fnr.err)
if fnr.err == nil { if fnr.err == nil {
// ignore error // ignore error

View File

@ -1,6 +1,7 @@
package backend package backend
import ( import (
"context"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
@ -13,6 +14,7 @@ import (
"github.com/peterbourgon/unixtransport" "github.com/peterbourgon/unixtransport"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
) )
// TransportOptions collects various options which can be set for an HTTP based // TransportOptions collects various options which can be set for an HTTP based
@ -66,14 +68,28 @@ func readPEMCertKey(filename string) (certs []byte, key []byte, err error) {
// a custom rootCertFilename is non-empty, it must point to a valid PEM file, // a custom rootCertFilename is non-empty, it must point to a valid PEM file,
// otherwise the function will return an error. // otherwise the function will return an error.
func Transport(opts TransportOptions) (http.RoundTripper, error) { func Transport(opts TransportOptions) (http.RoundTripper, error) {
dial := (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext
dialTimeout := dial
if feature.Flag.Enabled(feature.HTTPTimeouts) {
// inject timeoutConn to enforce progress
dialTimeout = func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := dial(ctx, network, addr)
if err != nil {
return conn, err
}
return newTimeoutConn(conn, 5*time.Minute)
}
}
// copied from net/http // copied from net/http
tr := &http.Transport{ tr := &http.Transport{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{ DialContext: dialTimeout,
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true, ForceAttemptHTTP2: true,
MaxIdleConns: 100, MaxIdleConns: 100,
MaxIdleConnsPerHost: 100, MaxIdleConnsPerHost: 100,

View File

@ -0,0 +1,172 @@
package backend
import (
"net"
"os"
"sync"
"time"
)
// timeoutConn will timeout if no read or write progress is made for progressTimeout.
// This ensures that stuck network connections are interrupted after some time.
// By using a timeoutConn within a http transport (via DialContext), sending / receing
// the request / response body is guarded with a timeout. The read progress part also
// limits the time until a response header must be received.
//
// The progressTimeout must be larger than the IdleConnTimeout of the http transport.
//
// The http2.Transport offers a similar functionality via WriteByteTimeout & ReadIdleTimeout.
// However, those are not available for HTTP/1 connections. Thus, there's no builtin way to
// enforce progress for sending the request body or reading the response body.
// See https://github.com/restic/restic/issues/4193#issuecomment-2067988727 for details.
type timeoutConn struct {
conn net.Conn
// timeout within which a read/write must make progress, otherwise a connection is considered broken
// if no read/write is pending, then the timeout is inactive
progressTimeout time.Duration
// all access to fields below must hold m
m sync.Mutex
// user defined read/write deadline
readDeadline time.Time
writeDeadline time.Time
// timestamp of last successful write (at least one byte)
lastWrite time.Time
}
var _ net.Conn = &timeoutConn{}
func newTimeoutConn(conn net.Conn, progressTimeout time.Duration) (*timeoutConn, error) {
// reset timeouts to ensure a consistent state
err := conn.SetDeadline(time.Time{})
if err != nil {
return nil, err
}
return &timeoutConn{
conn: conn,
progressTimeout: progressTimeout,
}, nil
}
func (t *timeoutConn) Write(p []byte) (n int, err error) {
t.m.Lock()
timeout := t.writeDeadline
t.m.Unlock()
var zero time.Time
if timeout != zero {
// fall back to standard behavior if a timeout was set explicitly
n, err := t.conn.Write(p)
if n > 0 {
t.m.Lock()
t.lastWrite = time.Now()
t.m.Unlock()
}
return n, err
}
// based on http2stickyErrWriter.Write from go/src/net/http/h2_bundle.go
for {
_ = t.conn.SetWriteDeadline(time.Now().Add(t.progressTimeout))
nn, err := t.conn.Write(p[n:])
n += nn
if nn > 0 {
// track write progress
t.m.Lock()
t.lastWrite = time.Now()
t.m.Unlock()
}
if n < len(p) && nn > 0 && err == os.ErrDeadlineExceeded {
// some data is still left to send, keep going as long as there is some progress
continue
}
t.m.Lock()
// restore configured deadline
_ = t.conn.SetWriteDeadline(t.writeDeadline)
t.m.Unlock()
return n, err
}
}
func (t *timeoutConn) Read(b []byte) (n int, err error) {
t.m.Lock()
timeout := t.readDeadline
t.m.Unlock()
var zero time.Time
if timeout != zero {
// fall back to standard behavior if a timeout was set explicitly
return t.conn.Read(b)
}
var start = time.Now()
for {
_ = t.conn.SetReadDeadline(start.Add(t.progressTimeout))
nn, err := t.conn.Read(b)
t.m.Lock()
lastWrite := t.lastWrite
t.m.Unlock()
if nn == 0 && err == os.ErrDeadlineExceeded && lastWrite.After(start) {
// deadline exceeded, but write made some progress in the meantime
start = lastWrite
continue
}
t.m.Lock()
// restore configured deadline
_ = t.conn.SetReadDeadline(t.readDeadline)
t.m.Unlock()
return nn, err
}
}
func (t *timeoutConn) Close() error {
return t.conn.Close()
}
func (t *timeoutConn) LocalAddr() net.Addr {
return t.conn.LocalAddr()
}
func (t *timeoutConn) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}
func (t *timeoutConn) SetDeadline(d time.Time) error {
err := t.SetReadDeadline(d)
err2 := t.SetWriteDeadline(d)
if err != nil {
return err
}
return err2
}
func (t *timeoutConn) SetReadDeadline(d time.Time) error {
t.m.Lock()
defer t.m.Unlock()
// track timeout modifications, as the current timeout cannot be queried
err := t.conn.SetReadDeadline(d)
if err != nil {
return err
}
t.readDeadline = d
return nil
}
func (t *timeoutConn) SetWriteDeadline(d time.Time) error {
t.m.Lock()
defer t.m.Unlock()
err := t.conn.SetWriteDeadline(d)
if err != nil {
return err
}
t.writeDeadline = d
return nil
}

View File

@ -106,9 +106,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error {
return err return err
} }
func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID]restic.BlobType { func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType) packs := make(map[restic.ID]restic.BlobType)
idx.Each(ctx, func(pb restic.PackedBlob) { err := idx.Each(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID] tpe, exists := packs[pb.PackID]
if exists { if exists {
if pb.Type != tpe { if pb.Type != tpe {
@ -119,7 +119,7 @@ func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID
} }
packs[pb.PackID] = tpe packs[pb.PackID] = tpe
}) })
return packs return packs, err
} }
// LoadIndex loads all index files. // LoadIndex loads all index files.
@ -169,7 +169,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
debug.Log("process blobs") debug.Log("process blobs")
cnt := 0 cnt := 0
index.Each(ctx, func(blob restic.PackedBlob) { err = index.Each(ctx, func(blob restic.PackedBlob) {
cnt++ cnt++
if _, ok := packToIndex[blob.PackID]; !ok { if _, ok := packToIndex[blob.PackID]; !ok {
@ -179,7 +179,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
}) })
debug.Log("%d blobs processed", cnt) debug.Log("%d blobs processed", cnt)
return nil return err
}) })
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
@ -193,8 +193,14 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
} }
// compute pack size using index entries // compute pack size using index entries
c.packs = pack.Size(ctx, c.masterIndex, false) c.packs, err = pack.Size(ctx, c.masterIndex, false)
packTypes := computePackTypes(ctx, c.masterIndex) if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.masterIndex)
if err != nil {
return hints, append(errs, err)
}
debug.Log("checking for duplicate packs") debug.Log("checking for duplicate packs")
for packID := range c.packs { for packID := range c.packs {
@ -484,7 +490,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
} }
// UnusedBlobs returns all blobs that have never been referenced. // UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) { func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, err error) {
if !c.trackUnused { if !c.trackUnused {
panic("only works when tracking blob references") panic("only works when tracking blob references")
} }
@ -495,7 +501,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
c.repo.Index().Each(ctx, func(blob restic.PackedBlob) { err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) { if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h) debug.Log("blob %v not referenced", h)
@ -503,7 +509,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
} }
}) })
return blobs return blobs, err
} }
// CountPacks returns the number of packs in the repository. // CountPacks returns the number of packs in the repository.

View File

@ -180,7 +180,8 @@ func TestUnreferencedBlobs(t *testing.T) {
test.OKs(t, checkPacks(chkr)) test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr)) test.OKs(t, checkStruct(chkr))
blobs := chkr.UnusedBlobs(context.TODO()) blobs, err := chkr.UnusedBlobs(context.TODO())
test.OK(t, err)
sort.Sort(blobs) sort.Sort(blobs)
test.Equals(t, unusedBlobsBySnapshot, blobs) test.Equals(t, unusedBlobsBySnapshot, blobs)

View File

@ -43,7 +43,10 @@ func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) {
} }
// unused blobs // unused blobs
blobs := chkr.UnusedBlobs(context.TODO()) blobs, err := chkr.UnusedBlobs(context.TODO())
if err != nil {
t.Error(err)
}
if len(blobs) > 0 { if len(blobs) > 0 {
t.Errorf("unused blobs found: %v", blobs) t.Errorf("unused blobs found: %v", blobs)
} }

View File

@ -8,6 +8,7 @@ const (
DeprecateLegacyIndex FlagName = "deprecate-legacy-index" DeprecateLegacyIndex FlagName = "deprecate-legacy-index"
DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout"
DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks"
HTTPTimeouts FlagName = "http-timeouts"
) )
func init() { func init() {
@ -15,5 +16,6 @@ func init() {
DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."}, DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."},
DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."}, DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."},
DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"},
HTTPTimeouts: {Type: Beta, Description: "improve handling of stuck HTTP connections using timeouts."},
}) })
} }

View File

@ -218,7 +218,7 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
// Each passes all blobs known to the index to the callback fn. This blocks any // Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index. // modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) { func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
@ -232,6 +232,7 @@ func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
return true return true
}) })
} }
return ctx.Err()
} }
type EachByPackResult struct { type EachByPackResult struct {

View File

@ -339,7 +339,7 @@ func TestIndexUnserialize(t *testing.T) {
rtest.Equals(t, oldIdx, idx.Supersedes()) rtest.Equals(t, oldIdx, idx.Supersedes())
blobs := listPack(idx, exampleLookupTest.packID) blobs := listPack(t, idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) { if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs)) t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
} }
@ -356,12 +356,12 @@ func TestIndexUnserialize(t *testing.T) {
} }
} }
func listPack(idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) { func listPack(t testing.TB, idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
idx.Each(context.TODO(), func(pb restic.PackedBlob) { rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) { if pb.PackID.Equal(id) {
pbs = append(pbs, pb) pbs = append(pbs, pb)
} }
}) }))
return pbs return pbs
} }

View File

@ -223,13 +223,16 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
// Each runs fn on all blobs known to the index. When the context is cancelled, // Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index. // the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) { func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock() mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock() defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx { for _, idx := range mi.idx {
idx.Each(ctx, fn) if err := idx.Each(ctx, fn); err != nil {
return err
}
} }
return nil
} }
// MergeFinalIndexes merges all final indexes together. // MergeFinalIndexes merges all final indexes together.
@ -320,6 +323,9 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, exclude
newIndex = NewIndex() newIndex = NewIndex()
} }
} }
if wgCtx.Err() != nil {
return wgCtx.Err()
}
} }
err := newIndex.AddToSupersedes(extraObsolete...) err := newIndex.AddToSupersedes(extraObsolete...)
@ -426,10 +432,6 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
defer close(out) defer close(out)
// only resort a part of the index to keep the memory overhead bounded // only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ { for i := byte(0); i < 16; i++ {
if ctx.Err() != nil {
return
}
packBlob := make(map[restic.ID][]restic.Blob) packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs { for pack := range packs {
if pack[0]&0xf == i { if pack[0]&0xf == i {
@ -439,11 +441,14 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 { if len(packBlob) == 0 {
continue continue
} }
mi.Each(ctx, func(pb restic.PackedBlob) { err := mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i { if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob) packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
} }
}) })
if err != nil {
return
}
// pass on packs // pass on packs
for packID, pbs := range packBlob { for packID, pbs := range packBlob {

View File

@ -166,9 +166,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, 1, idxCount) rtest.Equals(t, 1, idxCount)
blobCount := 0 blobCount := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++ blobCount++
}) }))
rtest.Equals(t, 2, blobCount) rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1) blobs := mIdx.Lookup(bhInIdx1)
@ -198,9 +198,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs) rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0 blobCount = 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++ blobCount++
}) }))
rtest.Equals(t, 2, blobCount) rtest.Equals(t, 2, blobCount)
} }
@ -319,9 +319,9 @@ func BenchmarkMasterIndexEach(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
entries := 0 entries := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { rtest.OK(b, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++ entries++
}) }))
} }
} }

View File

@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
// If onlyHdr is set to true, only the size of the header is returned // If onlyHdr is set to true, only the size of the header is returned
// Note that this function only gives correct sizes, if there are no // Note that this function only gives correct sizes, if there are no
// duplicates in the index. // duplicates in the index.
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 { func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) {
packSize := make(map[restic.ID]int64) packSize := make(map[restic.ID]int64)
mi.Each(ctx, func(blob restic.PackedBlob) { err := mi.Each(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID] size, ok := packSize[blob.PackID]
if !ok { if !ok {
size = headerSize size = headerSize
@ -403,5 +403,5 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob)) packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
}) })
return packSize return packSize, err
} }

View File

@ -124,12 +124,15 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
blobCount := keepBlobs.Len() blobCount := keepBlobs.Len()
// when repacking, we do not want to keep blobs which are // when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs // already contained in kept packs, so delete them from keepBlobs
repo.Index().Each(ctx, func(blob restic.PackedBlob) { err := repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
return return
} }
keepBlobs.Delete(blob.BlobHandle) keepBlobs.Delete(blob.BlobHandle)
}) })
if err != nil {
return nil, err
}
if keepBlobs.Len() < blobCount/2 { if keepBlobs.Len() < blobCount/2 {
// replace with copy to shrink map to necessary size if there's a chance to benefit // replace with copy to shrink map to necessary size if there's a chance to benefit
@ -152,7 +155,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// iterate over all blobs in index to find out which blobs are duplicates // iterate over all blobs in index to find out which blobs are duplicates
// The counter in usedBlobs describes how many instances of the blob exist in the repository index // The counter in usedBlobs describes how many instances of the blob exist in the repository index
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist // Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
idx.Each(ctx, func(blob restic.PackedBlob) { err := idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle bh := blob.BlobHandle
count, ok := usedBlobs[bh] count, ok := usedBlobs[bh]
if ok { if ok {
@ -166,6 +169,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
usedBlobs[bh] = count usedBlobs[bh] = count
} }
}) })
if err != nil {
return nil, nil, err
}
// Check if all used blobs have been found in index // Check if all used blobs have been found in index
missingBlobs := restic.NewBlobSet() missingBlobs := restic.NewBlobSet()
@ -188,14 +194,18 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
indexPack := make(map[restic.ID]packInfo) indexPack := make(map[restic.ID]packInfo)
// save computed pack header size // save computed pack header size
for pid, hdrSize := range pack.Size(ctx, idx, true) { sz, err := pack.Size(ctx, idx, true)
if err != nil {
return nil, nil, err
}
for pid, hdrSize := range sz {
// initialize tpe with NumBlobTypes to indicate it's not set // initialize tpe with NumBlobTypes to indicate it's not set
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)} indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
} }
hasDuplicates := false hasDuplicates := false
// iterate over all blobs in index to generate packInfo // iterate over all blobs in index to generate packInfo
idx.Each(ctx, func(blob restic.PackedBlob) { err = idx.Each(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID] ip := indexPack[blob.PackID]
// Set blob type if not yet set // Set blob type if not yet set
@ -240,6 +250,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack // update indexPack
indexPack[blob.PackID] = ip indexPack[blob.PackID] = ip
}) })
if err != nil {
return nil, nil, err
}
// if duplicate blobs exist, those will be set to either "used" or "unused": // if duplicate blobs exist, those will be set to either "used" or "unused":
// - mark only one occurrence of duplicate blobs as used // - mark only one occurrence of duplicate blobs as used
@ -247,7 +260,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// - if there are no used blobs in a pack, possibly mark duplicates as "unused" // - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if hasDuplicates { if hasDuplicates {
// iterate again over all blobs in index (this is pretty cheap, all in-mem) // iterate again over all blobs in index (this is pretty cheap, all in-mem)
idx.Each(ctx, func(blob restic.PackedBlob) { err = idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle bh := blob.BlobHandle
count, ok := usedBlobs[bh] count, ok := usedBlobs[bh]
// skip non-duplicate, aka. normal blobs // skip non-duplicate, aka. normal blobs
@ -285,6 +298,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack // update indexPack
indexPack[blob.PackID] = ip indexPack[blob.PackID] = ip
}) })
if err != nil {
return nil, nil, err
}
} }
// Sanity check. If no duplicates exist, all blobs have value 1. After handling // Sanity check. If no duplicates exist, all blobs have value 1. After handling
@ -528,6 +544,9 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
printer.P("deleting unreferenced packs\n") printer.P("deleting unreferenced packs\n")
_ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer) _ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer)
} }
if ctx.Err() != nil {
return ctx.Err()
}
if len(plan.repackPacks) != 0 { if len(plan.repackPacks) != 0 {
printer.P("repacking packs\n") printer.P("repacking packs\n")
@ -578,6 +597,9 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
printer.P("removing %d old packs\n", len(plan.removePacks)) printer.P("removing %d old packs\n", len(plan.removePacks))
_ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer) _ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer)
} }
if ctx.Err() != nil {
return ctx.Err()
}
if plan.opts.UnsafeRecovery { if plan.opts.UnsafeRecovery {
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer)

View File

@ -72,7 +72,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return wgCtx.Err() return wgCtx.Err()
} }
} }
return nil return wgCtx.Err()
}) })
worker := func() error { worker := func() error {

View File

@ -54,7 +54,10 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
if err != nil { if err != nil {
return err return err
} }
packSizeFromIndex = pack.Size(ctx, repo.Index(), false) packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false)
if err != nil {
return err
}
} }
printer.P("getting pack files to read...\n") printer.P("getting pack files to read...\n")

View File

@ -17,7 +17,7 @@ import (
func listBlobs(repo restic.Repository) restic.BlobSet { func listBlobs(repo restic.Repository) restic.BlobSet {
blobs := restic.NewBlobSet() blobs := restic.NewBlobSet()
repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) { _ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
blobs.Insert(pb.BlobHandle) blobs.Insert(pb.BlobHandle)
}) })
return blobs return blobs

View File

@ -704,15 +704,21 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
defer cancel() defer cancel()
invalidIndex := false invalidIndex := false
r.idx.Each(ctx, func(blob restic.PackedBlob) { err := r.idx.Each(ctx, func(blob restic.PackedBlob) {
if blob.IsCompressed() { if blob.IsCompressed() {
invalidIndex = true invalidIndex = true
} }
}) })
if err != nil {
return err
}
if invalidIndex { if invalidIndex {
return errors.New("index uses feature not supported by repository version 1") return errors.New("index uses feature not supported by repository version 1")
} }
} }
if ctx.Err() != nil {
return ctx.Err()
}
// remove index files from the cache which have been removed in the repo // remove index files from the cache which have been removed in the repo
return r.prepareCache() return r.prepareCache()

View File

@ -370,13 +370,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id) idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err) rtest.OK(t, err)
idx.Each(context.TODO(), func(pb restic.PackedBlob) { rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := packEntries[pb.PackID]; !ok { if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{}) packEntries[pb.PackID] = make(map[restic.ID]struct{})
} }
packEntries[pb.PackID][id] = struct{}{} packEntries[pb.PackID][id] = struct{}{}
}) }))
return nil return nil
}) })
if err != nil { if err != nil {

View File

@ -103,8 +103,8 @@ type MasterIndex interface {
Lookup(BlobHandle) []PackedBlob Lookup(BlobHandle) []PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled, // Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index. // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob)) Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error