local: Limit concurrent backend operations

Use a limit of 2 similar to the filereader concurrency in the archiver.
This commit is contained in:
Michael Eischer 2021-08-07 19:50:00 +02:00
parent 0b258cc054
commit cd783358d3
7 changed files with 88 additions and 32 deletions

View File

@ -11,6 +11,15 @@ import (
type Config struct { type Config struct {
Path string Path string
Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"` Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"`
Connections uint `option:"connections" help:"set a limit for the number of concurrent operations (default: 2)"`
}
// NewConfig returns a new config with default options applied.
func NewConfig() Config {
return Config{
Connections: 2,
}
} }
func init() { func init() {
@ -18,10 +27,12 @@ func init() {
} }
// ParseConfig parses a local backend config. // ParseConfig parses a local backend config.
func ParseConfig(cfg string) (interface{}, error) { func ParseConfig(s string) (interface{}, error) {
if !strings.HasPrefix(cfg, "local:") { if !strings.HasPrefix(s, "local:") {
return nil, errors.New(`invalid format, prefix "local" not found`) return nil, errors.New(`invalid format, prefix "local" not found`)
} }
return Config{Path: cfg[6:]}, nil cfg := NewConfig()
cfg.Path = s[6:]
return cfg, nil
} }

View File

@ -37,8 +37,9 @@ func TestLayout(t *testing.T) {
repo := filepath.Join(path, "repo") repo := filepath.Join(path, "repo")
be, err := Open(context.TODO(), Config{ be, err := Open(context.TODO(), Config{
Path: repo, Path: repo,
Layout: test.layout, Layout: test.layout,
Connections: 2,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -22,6 +22,7 @@ import (
// Local is a backend in a local directory. // Local is a backend in a local directory.
type Local struct { type Local struct {
Config Config
sem *backend.Semaphore
backend.Layout backend.Layout
} }
@ -30,15 +31,28 @@ var _ restic.Backend = &Local{}
const defaultLayout = "default" const defaultLayout = "default"
// Open opens the local backend as specified by config. func open(ctx context.Context, cfg Config) (*Local, error) {
func Open(ctx context.Context, cfg Config) (*Local, error) {
debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout)
l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path) l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Local{Config: cfg, Layout: l}, nil sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
return &Local{
Config: cfg,
Layout: l,
sem: sem,
}, nil
}
// Open opens the local backend as specified by config.
func Open(ctx context.Context, cfg Config) (*Local, error) {
debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout)
return open(ctx, cfg)
} }
// Create creates all the necessary files and directories for a new local // Create creates all the necessary files and directories for a new local
@ -46,16 +60,11 @@ func Open(ctx context.Context, cfg Config) (*Local, error) {
func Create(ctx context.Context, cfg Config) (*Local, error) { func Create(ctx context.Context, cfg Config) (*Local, error) {
debug.Log("create local backend at %v (layout %q)", cfg.Path, cfg.Layout) debug.Log("create local backend at %v (layout %q)", cfg.Path, cfg.Layout)
l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path) be, err := open(ctx, cfg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
be := &Local{
Config: cfg,
Layout: l,
}
// test if config file already exists // test if config file already exists
_, err = fs.Lstat(be.Filename(restic.Handle{Type: restic.ConfigFile})) _, err = fs.Lstat(be.Filename(restic.Handle{Type: restic.ConfigFile}))
if err == nil { if err == nil {
@ -73,6 +82,10 @@ func Create(ctx context.Context, cfg Config) (*Local, error) {
return be, nil return be, nil
} }
func (b *Local) Connections() uint {
return b.Config.Connections
}
// Location returns this backend's location (the directory name). // Location returns this backend's location (the directory name).
func (b *Local) Location() string { func (b *Local) Location() string {
return b.Path return b.Path
@ -105,6 +118,9 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade
} }
}() }()
b.sem.GetToken()
defer b.sem.ReleaseToken()
// Create new file with a temporary name. // Create new file with a temporary name.
tmpname := filepath.Base(finalname) + "-tmp-" tmpname := filepath.Base(finalname) + "-tmp-"
f, err := tempFile(dir, tmpname) f, err := tempFile(dir, tmpname)
@ -199,24 +215,29 @@ func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, off
return nil, errors.New("offset is negative") return nil, errors.New("offset is negative")
} }
b.sem.GetToken()
f, err := fs.Open(b.Filename(h)) f, err := fs.Open(b.Filename(h))
if err != nil { if err != nil {
b.sem.ReleaseToken()
return nil, err return nil, err
} }
if offset > 0 { if offset > 0 {
_, err = f.Seek(offset, 0) _, err = f.Seek(offset, 0)
if err != nil { if err != nil {
b.sem.ReleaseToken()
_ = f.Close() _ = f.Close()
return nil, err return nil, err
} }
} }
r := b.sem.ReleaseTokenOnClose(f, nil)
if length > 0 { if length > 0 {
return backend.LimitReadCloser(f, int64(length)), nil return backend.LimitReadCloser(r, int64(length)), nil
} }
return f, nil return r, nil
} }
// Stat returns information about a blob. // Stat returns information about a blob.
@ -226,6 +247,9 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
return restic.FileInfo{}, backoff.Permanent(err) return restic.FileInfo{}, backoff.Permanent(err)
} }
b.sem.GetToken()
defer b.sem.ReleaseToken()
fi, err := fs.Stat(b.Filename(h)) fi, err := fs.Stat(b.Filename(h))
if err != nil { if err != nil {
return restic.FileInfo{}, errors.WithStack(err) return restic.FileInfo{}, errors.WithStack(err)
@ -237,6 +261,10 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err
// Test returns true if a blob of the given type and name exists in the backend. // Test returns true if a blob of the given type and name exists in the backend.
func (b *Local) Test(ctx context.Context, h restic.Handle) (bool, error) { func (b *Local) Test(ctx context.Context, h restic.Handle) (bool, error) {
debug.Log("Test %v", h) debug.Log("Test %v", h)
b.sem.GetToken()
defer b.sem.ReleaseToken()
_, err := fs.Stat(b.Filename(h)) _, err := fs.Stat(b.Filename(h))
if err != nil { if err != nil {
if b.IsNotExist(err) { if b.IsNotExist(err) {
@ -253,6 +281,9 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error {
debug.Log("Remove %v", h) debug.Log("Remove %v", h)
fn := b.Filename(h) fn := b.Filename(h)
b.sem.GetToken()
defer b.sem.ReleaseToken()
// reset read-only flag // reset read-only flag
err := fs.Chmod(fn, 0666) err := fs.Chmod(fn, 0666)
if err != nil && !os.IsPermission(err) { if err != nil && !os.IsPermission(err) {

View File

@ -27,7 +27,7 @@ func TestNoSpacePermanent(t *testing.T) {
dir, cleanup := rtest.TempDir(t) dir, cleanup := rtest.TempDir(t)
defer cleanup() defer cleanup()
be, err := Open(context.Background(), Config{Path: dir}) be, err := Open(context.Background(), Config{Path: dir, Connections: 2})
rtest.OK(t, err) rtest.OK(t, err)
defer func() { defer func() {
rtest.OK(t, be.Close()) rtest.OK(t, be.Close())

View File

@ -25,7 +25,8 @@ func newTestSuite(t testing.TB) *test.Suite {
t.Logf("create new backend at %v", dir) t.Logf("create new backend at %v", dir)
cfg := local.Config{ cfg := local.Config{
Path: dir, Path: dir,
Connections: 2,
} }
return cfg, nil return cfg, nil
}, },

View File

@ -30,7 +30,8 @@ var parseTests = []struct {
"local:/srv/repo", "local:/srv/repo",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "/srv/repo", Path: "/srv/repo",
Connections: 2,
}, },
}, },
}, },
@ -38,7 +39,8 @@ var parseTests = []struct {
"local:dir1/dir2", "local:dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "dir1/dir2", Path: "dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -46,7 +48,8 @@ var parseTests = []struct {
"local:dir1/dir2", "local:dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "dir1/dir2", Path: "dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -54,7 +57,8 @@ var parseTests = []struct {
"dir1/dir2", "dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "dir1/dir2", Path: "dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -62,7 +66,8 @@ var parseTests = []struct {
"/dir1/dir2", "/dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "/dir1/dir2", Path: "/dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -70,7 +75,8 @@ var parseTests = []struct {
"local:../dir1/dir2", "local:../dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "../dir1/dir2", Path: "../dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -78,7 +84,8 @@ var parseTests = []struct {
"/dir1/dir2", "/dir1/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "/dir1/dir2", Path: "/dir1/dir2",
Connections: 2,
}, },
}, },
}, },
@ -86,7 +93,8 @@ var parseTests = []struct {
"/dir1:foobar/dir2", "/dir1:foobar/dir2",
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: "/dir1:foobar/dir2", Path: "/dir1:foobar/dir2",
Connections: 2,
}, },
}, },
}, },
@ -94,7 +102,8 @@ var parseTests = []struct {
`\dir1\foobar\dir2`, `\dir1\foobar\dir2`,
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: `\dir1\foobar\dir2`, Path: `\dir1\foobar\dir2`,
Connections: 2,
}, },
}, },
}, },
@ -102,7 +111,8 @@ var parseTests = []struct {
`c:\dir1\foobar\dir2`, `c:\dir1\foobar\dir2`,
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: `c:\dir1\foobar\dir2`, Path: `c:\dir1\foobar\dir2`,
Connections: 2,
}, },
}, },
}, },
@ -110,7 +120,8 @@ var parseTests = []struct {
`C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`, `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`, Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`,
Connections: 2,
}, },
}, },
}, },
@ -118,7 +129,8 @@ var parseTests = []struct {
`c:/dir1/foobar/dir2`, `c:/dir1/foobar/dir2`,
Location{Scheme: "local", Location{Scheme: "local",
Config: local.Config{ Config: local.Config{
Path: `c:/dir1/foobar/dir2`, Path: `c:/dir1/foobar/dir2`,
Connections: 2,
}, },
}, },
}, },

View File

@ -93,7 +93,7 @@ func TestRepository(t testing.TB) (r restic.Repository, cleanup func()) {
// TestOpenLocal opens a local repository. // TestOpenLocal opens a local repository.
func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) { func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) {
be, err := local.Open(context.TODO(), local.Config{Path: dir}) be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }