diff --git a/cmd/restic/cmd_init.go b/cmd/restic/cmd_init.go index 058f1ed07..ee3ec4b10 100644 --- a/cmd/restic/cmd_init.go +++ b/cmd/restic/cmd_init.go @@ -86,7 +86,13 @@ func runInit(opts InitOptions, gopts GlobalOptions, args []string) error { return errors.Fatalf("create repository at %s failed: %v\n", location.StripPassword(gopts.Repo), err) } - s := repository.New(be, repository.Options{Compression: gopts.Compression}) + s, err := repository.New(be, repository.Options{ + Compression: gopts.Compression, + PackSize: gopts.MinPackSize * 1024 * 1024, + }) + if err != nil { + return err + } err = s.Init(gopts.ctx, version, gopts.password, chunkerPolynomial) if err != nil { diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 876e6e614..d52dee34f 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" "syscall" "time" @@ -62,6 +63,7 @@ type GlobalOptions struct { NoCache bool CleanupCache bool Compression repository.CompressionMode + MinPackSize uint backend.TransportOptions limiter.Limits @@ -102,6 +104,9 @@ func init() { return nil }) + // parse min pack size from env, on error the default value will be used + minPackSize, _ := strconv.ParseUint(os.Getenv("RESTIC_MIN_PACKSIZE"), 10, 32) + f := cmdRoot.PersistentFlags() f.StringVarP(&globalOptions.Repo, "repo", "r", os.Getenv("RESTIC_REPOSITORY"), "`repository` to backup to or restore from (default: $RESTIC_REPOSITORY)") f.StringVarP(&globalOptions.RepositoryFile, "repository-file", "", os.Getenv("RESTIC_REPOSITORY_FILE"), "`file` to read the repository location from (default: $RESTIC_REPOSITORY_FILE)") @@ -121,6 +126,7 @@ func init() { f.Var(&globalOptions.Compression, "compression", "compression mode (only available for repository format version 2), one of (auto|off|max)") f.IntVar(&globalOptions.Limits.UploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)") f.IntVar(&globalOptions.Limits.DownloadKb, "limit-download", 0, "limits downloads to a maximum rate in KiB/s. (default: unlimited)") + f.UintVar(&globalOptions.MinPackSize, "min-packsize", uint(minPackSize), "set min pack size in MiB. (default: $RESTIC_MIN_PACKSIZE)") f.StringSliceVarP(&globalOptions.Options, "option", "o", []string{}, "set extended option (`key=value`, can be specified multiple times)") // Use our "generate" command instead of the cobra provided "completion" command cmdRoot.CompletionOptions.DisableDefaultCmd = true @@ -440,7 +446,13 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) { } } - s := repository.New(be, repository.Options{Compression: opts.Compression}) + s, err := repository.New(be, repository.Options{ + Compression: opts.Compression, + PackSize: opts.MinPackSize * 1024 * 1024, + }) + if err != nil { + return nil, err + } passwordTriesLeft := 1 if stdinIsTerminal() && opts.password == "" { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index c82375e3c..b3a736152 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -348,7 +348,8 @@ func TestCheckerModifiedData(t *testing.T) { t.Logf("archived as %v", sn.ID().Str()) beError := &errorBackend{Backend: repo.Backend()} - checkRepo := repository.New(beError, repository.Options{}) + checkRepo, err := repository.New(beError, repository.Options{}) + test.OK(t, err) test.OK(t, checkRepo.SearchKey(context.TODO(), test.TestPassword, 5, "")) chkr := checker.New(checkRepo, false) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 32b2c9b7a..6179aab5c 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -34,19 +34,19 @@ type packerManager struct { key *crypto.Key queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error - pm sync.Mutex - packer *Packer + pm sync.Mutex + packer *Packer + packSize uint } -const minPackSize = 4 * 1024 * 1024 - // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - tpe: tpe, - key: key, - queueFn: queueFn, + tpe: tpe, + key: key, + queueFn: queueFn, + packSize: packSize, } } @@ -88,7 +88,7 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest } // if the pack is not full enough, put back to the list - if packer.Size() < minPackSize { + if packer.Size() < r.packSize { debug.Log("pack is not full enough (%d bytes)", packer.Size()) return size, nil } diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 67a33c757..90f716e0d 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -31,7 +31,7 @@ func min(a, b int) int { } func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) { - for i := 0; i < 100; i++ { + for i := 0; i < 102; i++ { l := rnd.Intn(maxBlobSize) id := randomID(rnd) buf = buf[:l] @@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) savedBytes := int(0) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error { err := p.Finalize() if err != nil { return err @@ -104,7 +104,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error { return nil }) fillPacks(t, rnd, pm, blobBuf) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index c35bf1b76..2b7750648 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -28,6 +28,10 @@ import ( const MaxStreamBufferSize = 4 * 1024 * 1024 +const MinPackSize = 4 * 1024 * 1024 +const DefaultPackSize = 16 * 1024 * 1024 +const MaxPackSize = 128 * 1024 * 1024 + // Repository is used to access a repository in a backend. type Repository struct { be restic.Backend @@ -54,6 +58,7 @@ type Repository struct { type Options struct { Compression CompressionMode + PackSize uint } // CompressionMode configures if data should be compressed. @@ -100,14 +105,23 @@ func (c *CompressionMode) Type() string { } // New returns a new repository with backend be. -func New(be restic.Backend, opts Options) *Repository { +func New(be restic.Backend, opts Options) (*Repository, error) { + if opts.PackSize == 0 { + opts.PackSize = DefaultPackSize + } + if opts.PackSize > MaxPackSize { + return nil, errors.Fatalf("pack size larger than limit of %v MiB", MaxPackSize/1024/1024) + } else if opts.PackSize < MinPackSize { + return nil, errors.Fatalf("pack size smaller than minimum of %v MiB", MinPackSize/1024/1024) + } + repo := &Repository{ be: be, opts: opts, idx: NewMasterIndex(), } - return repo + return repo, nil } // DisableAutoIndexUpdate deactives the automatic finalization and upload of new @@ -129,6 +143,11 @@ func (r *Repository) Config() restic.Config { return r.cfg } +// MinPackSize return the minimum size of a pack file before uploading +func (r *Repository) MinPackSize() uint { + return r.opts.PackSize +} + // UseCache replaces the backend with the wrapped cache. func (r *Repository) UseCache(c *cache.Cache) { if c == nil { @@ -497,8 +516,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) - r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.MinPackSize(), r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.MinPackSize(), r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait() diff --git a/internal/repository/testing.go b/internal/repository/testing.go index b9b38b1f4..380a47d04 100644 --- a/internal/repository/testing.go +++ b/internal/repository/testing.go @@ -52,10 +52,13 @@ func TestRepositoryWithBackend(t testing.TB, be restic.Backend, version uint) (r be, beCleanup = TestBackend(t) } - repo := New(be, Options{}) + repo, err := New(be, Options{}) + if err != nil { + t.Fatalf("TestRepository(): new repo failed: %v", err) + } cfg := restic.TestCreateConfig(t, TestChunkerPol, version) - err := repo.init(context.TODO(), test.TestPassword, cfg) + err = repo.init(context.TODO(), test.TestPassword, cfg) if err != nil { t.Fatalf("TestRepository(): initialize repo failed: %v", err) } @@ -104,7 +107,10 @@ func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) { t.Fatal(err) } - repo := New(be, Options{}) + repo, err := New(be, Options{}) + if err != nil { + t.Fatal(err) + } err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "") if err != nil { t.Fatal(err) diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 2bf12503f..1e64289e5 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -25,6 +25,7 @@ type Repository interface { LookupBlobSize(ID, BlobType) (uint, bool) Config() Config + MinPackSize() uint // List calls the function fn for each file of type t in the repository. // When an error is returned by fn, processing stops and List() returns the