diff --git a/internal/backend/location/location_test.go b/internal/backend/location/location_test.go index ded9450e9..809379850 100644 --- a/internal/backend/location/location_test.go +++ b/internal/backend/location/location_test.go @@ -138,9 +138,10 @@ var parseTests = []struct { "sftp:user@host:/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "/srv/repo", + User: "user", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, @@ -148,9 +149,10 @@ var parseTests = []struct { "sftp:host:/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "", - Host: "host", - Path: "/srv/repo", + User: "", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, @@ -158,9 +160,10 @@ var parseTests = []struct { "sftp://user@host/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "srv/repo", + User: "user", + Host: "host", + Path: "srv/repo", + Connections: 5, }, }, }, @@ -168,9 +171,10 @@ var parseTests = []struct { "sftp://user@host//srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "/srv/repo", + User: "user", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, diff --git a/internal/backend/sftp/config.go b/internal/backend/sftp/config.go index d5e0e5182..3b3d622a0 100644 --- a/internal/backend/sftp/config.go +++ b/internal/backend/sftp/config.go @@ -15,6 +15,15 @@ type Config struct { Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"` Command string `option:"command" help:"specify command to create sftp connection"` + + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` +} + +// NewConfig returns a new config with default options applied. +func NewConfig() Config { + return Config{ + Connections: 5, + } } func init() { @@ -75,10 +84,11 @@ func ParseConfig(s string) (interface{}, error) { return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.") } - return Config{ - User: user, - Host: host, - Port: port, - Path: p, - }, nil + cfg := NewConfig() + cfg.User = user + cfg.Host = host + cfg.Port = port + cfg.Path = p + + return cfg, nil } diff --git a/internal/backend/sftp/config_test.go b/internal/backend/sftp/config_test.go index d785a4113..3772c038b 100644 --- a/internal/backend/sftp/config_test.go +++ b/internal/backend/sftp/config_test.go @@ -11,68 +11,68 @@ var configTests = []struct { // first form, user specified sftp://user@host/dir { "sftp://user@host/dir/subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, { "sftp://host/dir/subdir", - Config{Host: "host", Path: "dir/subdir"}, + Config{Host: "host", Path: "dir/subdir", Connections: 5}, }, { "sftp://host//dir/subdir", - Config{Host: "host", Path: "/dir/subdir"}, + Config{Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp://host:10022//dir/subdir", - Config{Host: "host", Port: "10022", Path: "/dir/subdir"}, + Config{Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5}, }, { "sftp://user@host:10022//dir/subdir", - Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir"}, + Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5}, }, { "sftp://user@host/dir/subdir/../other", - Config{User: "user", Host: "host", Path: "dir/other"}, + Config{User: "user", Host: "host", Path: "dir/other", Connections: 5}, }, { "sftp://user@host/dir///subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, // IPv6 address. { "sftp://user@[::1]/dir", - Config{User: "user", Host: "::1", Path: "dir"}, + Config{User: "user", Host: "::1", Path: "dir", Connections: 5}, }, // IPv6 address with port. { "sftp://user@[::1]:22/dir", - Config{User: "user", Host: "::1", Port: "22", Path: "dir"}, + Config{User: "user", Host: "::1", Port: "22", Path: "dir", Connections: 5}, }, // second form, user specified sftp:user@host:/dir { "sftp:user@host:/dir/subdir", - Config{User: "user", Host: "host", Path: "/dir/subdir"}, + Config{User: "user", Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp:user@domain@host:/dir/subdir", - Config{User: "user@domain", Host: "host", Path: "/dir/subdir"}, + Config{User: "user@domain", Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp:host:../dir/subdir", - Config{Host: "host", Path: "../dir/subdir"}, + Config{Host: "host", Path: "../dir/subdir", Connections: 5}, }, { "sftp:user@host:dir/subdir:suffix", - Config{User: "user", Host: "host", Path: "dir/subdir:suffix"}, + Config{User: "user", Host: "host", Path: "dir/subdir:suffix", Connections: 5}, }, { "sftp:user@host:dir/subdir/../other", - Config{User: "user", Host: "host", Path: "dir/other"}, + Config{User: "user", Host: "host", Path: "dir/other", Connections: 5}, }, { "sftp:user@host:dir///subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, } diff --git a/internal/backend/sftp/layout_test.go b/internal/backend/sftp/layout_test.go index 0d0214669..3b654b1bb 100644 --- a/internal/backend/sftp/layout_test.go +++ b/internal/backend/sftp/layout_test.go @@ -43,9 +43,10 @@ func TestLayout(t *testing.T) { repo := filepath.Join(path, "repo") be, err := sftp.Open(context.TODO(), sftp.Config{ - Command: fmt.Sprintf("%q -e", sftpServer), - Path: repo, - Layout: test.layout, + Command: fmt.Sprintf("%q -e", sftpServer), + Path: repo, + Layout: test.layout, + Connections: 5, }) if err != nil { t.Fatal(err) diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 3e803a0f4..ad38e19ab 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -31,6 +31,7 @@ type SFTP struct { cmd *exec.Cmd result <-chan error + sem *backend.Semaphore backend.Layout Config } @@ -116,6 +117,11 @@ func (r *SFTP) clientError() error { func Open(ctx context.Context, cfg Config) (*SFTP, error) { debug.Log("open backend with config %#v", cfg) + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + cmd, args, err := buildSSHCommand(cfg) if err != nil { return nil, err @@ -136,6 +142,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) { sftp.Config = cfg sftp.p = cfg.Path + sftp.sem = sem return sftp, nil } @@ -238,6 +245,10 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) { return Open(ctx, cfg) } +func (r *SFTP) Connections() uint { + return r.Config.Connections +} + // Location returns this backend's location (the directory name). func (r *SFTP) Location() string { return r.p @@ -280,6 +291,9 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader tmpFilename := filename + "-restic-temp-" + tempSuffix() dirname := r.Dirname(h) + r.sem.GetToken() + defer r.sem.ReleaseToken() + // create new file f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) @@ -371,6 +385,19 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) } +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + io.WriterTo + f func() +} + +func (wr *wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} + func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { @@ -381,26 +408,38 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs return nil, errors.New("offset is negative") } + r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { + r.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { + r.sem.ReleaseToken() _ = f.Close() return nil, err } } + // use custom close wrapper to also provide WriteTo() on the wrapper + rd := &wrapReader{ + ReadCloser: f, + WriterTo: f, + f: func() { + r.sem.ReleaseToken() + }, + } + if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(f, int64(length)), nil + return backend.LimitReadCloser(rd, int64(length)), nil } - return f, nil + return rd, nil } // Stat returns information about a blob. @@ -414,6 +453,9 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro return restic.FileInfo{}, backoff.Permanent(err) } + r.sem.GetToken() + defer r.sem.ReleaseToken() + fi, err := r.c.Lstat(r.Filename(h)) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "Lstat") @@ -429,6 +471,9 @@ func (r *SFTP) Test(ctx context.Context, h restic.Handle) (bool, error) { return false, err } + r.sem.GetToken() + defer r.sem.ReleaseToken() + _, err := r.c.Lstat(r.Filename(h)) if os.IsNotExist(errors.Cause(err)) { return false, nil @@ -448,6 +493,9 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { return err } + r.sem.GetToken() + defer r.sem.ReleaseToken() + return r.c.Remove(r.Filename(h)) } @@ -458,7 +506,14 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) - for walker.Step() { + for { + r.sem.GetToken() + ok := walker.Step() + r.sem.ReleaseToken() + if !ok { + break + } + if walker.Err() != nil { if r.IsNotExist(walker.Err()) { debug.Log("ignoring non-existing directory") diff --git a/internal/backend/sftp/sftp_test.go b/internal/backend/sftp/sftp_test.go index 61bc49dc8..f0573dcb5 100644 --- a/internal/backend/sftp/sftp_test.go +++ b/internal/backend/sftp/sftp_test.go @@ -42,8 +42,9 @@ func newTestSuite(t testing.TB) *test.Suite { t.Logf("create new backend at %v", dir) cfg := sftp.Config{ - Path: dir, - Command: fmt.Sprintf("%q -e", sftpServer), + Path: dir, + Command: fmt.Sprintf("%q -e", sftpServer), + Connections: 5, } return cfg, nil },