From 683ebef6c658efd09eee3896edab09a50068f8bc Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 6 Jun 2017 00:17:39 +0200 Subject: [PATCH] s3: Use semaphore --- src/restic/backend/s3/config.go | 21 ++++-- src/restic/backend/s3/config_test.go | 106 +++++++++++++++------------ src/restic/backend/s3/s3.go | 35 ++++----- src/restic/backend/s3/s3_test.go | 15 ++-- 4 files changed, 96 insertions(+), 81 deletions(-) diff --git a/src/restic/backend/s3/config.go b/src/restic/backend/s3/config.go index 6cf7db9c1..3ae6038ab 100644 --- a/src/restic/backend/s3/config.go +++ b/src/restic/backend/s3/config.go @@ -18,6 +18,15 @@ type Config struct { Bucket string Prefix string Layout string `option:"layout" help:"use this backend layout (default: auto-detect)"` + + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"` +} + +// NewConfig returns a new Config with the default values filled in. +func NewConfig() Config { + return Config{ + Connections: 20, + } } func init() { @@ -70,10 +79,10 @@ func createConfig(endpoint string, p []string, useHTTP bool) (interface{}, error default: prefix = path.Clean(p[1]) } - return Config{ - Endpoint: endpoint, - UseHTTP: useHTTP, - Bucket: p[0], - Prefix: prefix, - }, nil + cfg := NewConfig() + cfg.Endpoint = endpoint + cfg.UseHTTP = useHTTP + cfg.Bucket = p[0] + cfg.Prefix = prefix + return cfg, nil } diff --git a/src/restic/backend/s3/config_test.go b/src/restic/backend/s3/config_test.go index 3a04d59a2..67611f3cc 100644 --- a/src/restic/backend/s3/config_test.go +++ b/src/restic/backend/s3/config_test.go @@ -7,78 +7,92 @@ var configTests = []struct { cfg Config }{ {"s3://eu-central-1/bucketname", Config{ - Endpoint: "eu-central-1", - Bucket: "bucketname", - Prefix: "restic", + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "restic", + Connections: 20, }}, {"s3://eu-central-1/bucketname/", Config{ - Endpoint: "eu-central-1", - Bucket: "bucketname", - Prefix: "restic", + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "restic", + Connections: 20, }}, {"s3://eu-central-1/bucketname/prefix/directory", Config{ - Endpoint: "eu-central-1", - Bucket: "bucketname", - Prefix: "prefix/directory", + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "prefix/directory", + Connections: 20, }}, {"s3://eu-central-1/bucketname/prefix/directory/", Config{ - Endpoint: "eu-central-1", - Bucket: "bucketname", - Prefix: "prefix/directory", + Endpoint: "eu-central-1", + Bucket: "bucketname", + Prefix: "prefix/directory", + Connections: 20, }}, {"s3:eu-central-1/foobar", Config{ - Endpoint: "eu-central-1", - Bucket: "foobar", - Prefix: "restic", + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "restic", + Connections: 20, }}, {"s3:eu-central-1/foobar/", Config{ - Endpoint: "eu-central-1", - Bucket: "foobar", - Prefix: "restic", + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "restic", + Connections: 20, }}, {"s3:eu-central-1/foobar/prefix/directory", Config{ - Endpoint: "eu-central-1", - Bucket: "foobar", - Prefix: "prefix/directory", + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "prefix/directory", + Connections: 20, }}, {"s3:eu-central-1/foobar/prefix/directory/", Config{ - Endpoint: "eu-central-1", - Bucket: "foobar", - Prefix: "prefix/directory", + Endpoint: "eu-central-1", + Bucket: "foobar", + Prefix: "prefix/directory", + Connections: 20, }}, {"s3:https://hostname:9999/foobar", Config{ - Endpoint: "hostname:9999", - Bucket: "foobar", - Prefix: "restic", + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + Connections: 20, }}, {"s3:https://hostname:9999/foobar/", Config{ - Endpoint: "hostname:9999", - Bucket: "foobar", - Prefix: "restic", + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + Connections: 20, }}, {"s3:http://hostname:9999/foobar", Config{ - Endpoint: "hostname:9999", - Bucket: "foobar", - Prefix: "restic", - UseHTTP: true, + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + UseHTTP: true, + Connections: 20, }}, {"s3:http://hostname:9999/foobar/", Config{ - Endpoint: "hostname:9999", - Bucket: "foobar", - Prefix: "restic", - UseHTTP: true, + Endpoint: "hostname:9999", + Bucket: "foobar", + Prefix: "restic", + UseHTTP: true, + Connections: 20, }}, {"s3:http://hostname:9999/bucket/prefix/directory", Config{ - Endpoint: "hostname:9999", - Bucket: "bucket", - Prefix: "prefix/directory", - UseHTTP: true, + Endpoint: "hostname:9999", + Bucket: "bucket", + Prefix: "prefix/directory", + UseHTTP: true, + Connections: 20, }}, {"s3:http://hostname:9999/bucket/prefix/directory/", Config{ - Endpoint: "hostname:9999", - Bucket: "bucket", - Prefix: "prefix/directory", - UseHTTP: true, + Endpoint: "hostname:9999", + Bucket: "bucket", + Prefix: "prefix/directory", + UseHTTP: true, + Connections: 20, }}, } diff --git a/src/restic/backend/s3/s3.go b/src/restic/backend/s3/s3.go index 960258392..93b476b86 100644 --- a/src/restic/backend/s3/s3.go +++ b/src/restic/backend/s3/s3.go @@ -23,7 +23,7 @@ const connLimit = 10 // s3 is a backend which stores the data on an S3 endpoint. type s3 struct { client *minio.Client - connChan chan struct{} + sem *backend.Semaphore bucketname string prefix string cacheMutex sync.RWMutex @@ -43,8 +43,14 @@ func Open(cfg Config) (restic.Backend, error) { return nil, errors.Wrap(err, "minio.New") } + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + be := &s3{ client: client, + sem: sem, bucketname: cfg.Bucket, prefix: cfg.Prefix, cacheObjSize: make(map[string]int64), @@ -59,8 +65,6 @@ func Open(cfg Config) (restic.Backend, error) { be.Layout = l - be.createConnections() - found, err := client.BucketExists(cfg.Bucket) if err != nil { debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err) @@ -78,13 +82,6 @@ func Open(cfg Config) (restic.Backend, error) { return be, nil } -func (be *s3) createConnections() { - be.connChan = make(chan struct{}, connLimit) - for i := 0; i < connLimit; i++ { - be.connChan <- struct{}{} - } -} - // IsNotExist returns true if the error is caused by a not existing file. func (be *s3) IsNotExist(err error) bool { debug.Log("IsNotExist(%T, %#v)", err, err) @@ -222,7 +219,7 @@ func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) { return errors.New("key already exists") } - <-be.connChan + be.sem.GetToken() // wrap the reader so that net/http client cannot close the reader, return // the token instead. @@ -234,11 +231,10 @@ func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) { } debug.Log("PutObject(%v, %v)", be.bucketname, objName) - coreClient := minio.Core{be.client} + coreClient := minio.Core{Client: be.client} info, err := coreClient.PutObject(be.bucketname, objName, size, rd, nil, nil, nil) - // return token - be.connChan <- struct{}{} + be.sem.ReleaseToken() debug.Log("%v -> %v bytes, err %#v", objName, info.Size, err) return errors.Wrap(err, "client.PutObject") @@ -275,8 +271,7 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er objName := be.Filename(h) - // get token for connection - <-be.connChan + be.sem.GetToken() byteRange := fmt.Sprintf("bytes=%d-", offset) if length > 0 { @@ -286,11 +281,10 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er headers.Add("Range", byteRange) debug.Log("Load(%v) send range %v", h, byteRange) - coreClient := minio.Core{be.client} + coreClient := minio.Core{Client: be.client} rd, _, err := coreClient.GetObject(be.bucketname, objName, headers) if err != nil { - // return token - be.connChan <- struct{}{} + be.sem.ReleaseToken() return nil, err } @@ -298,8 +292,7 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er ReadCloser: rd, f: func() { debug.Log("Close()") - // return token - be.connChan <- struct{}{} + be.sem.ReleaseToken() }, } diff --git a/src/restic/backend/s3/s3_test.go b/src/restic/backend/s3/s3_test.go index 787166994..ea453cf13 100644 --- a/src/restic/backend/s3/s3_test.go +++ b/src/restic/backend/s3/s3_test.go @@ -114,14 +114,13 @@ func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite { key, secret := newRandomCredentials(t) cfg.stopServer = runMinio(ctx, t, cfg.tempdir, key, secret) - cfg.Config = s3.Config{ - Endpoint: "localhost:9000", - Bucket: "restictestbucket", - Prefix: fmt.Sprintf("test-%d", time.Now().UnixNano()), - UseHTTP: true, - KeyID: key, - Secret: secret, - } + cfg.Config = s3.NewConfig() + cfg.Config.Endpoint = "localhost:9000" + cfg.Config.Bucket = "restictestbucket" + cfg.Config.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano()) + cfg.Config.UseHTTP = true + cfg.Config.KeyID = key + cfg.Config.Secret = secret return cfg, nil },