s3: Use semaphore

This commit is contained in:
Alexander Neumann 2017-06-06 00:17:39 +02:00
parent 5010e95c23
commit 683ebef6c6
4 changed files with 96 additions and 81 deletions

View File

@ -18,6 +18,15 @@ type Config struct {
Bucket string Bucket string
Prefix string Prefix string
Layout string `option:"layout" help:"use this backend layout (default: auto-detect)"` Layout string `option:"layout" help:"use this backend layout (default: auto-detect)"`
Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"`
}
// NewConfig returns a new Config with the default values filled in.
func NewConfig() Config {
return Config{
Connections: 20,
}
} }
func init() { func init() {
@ -70,10 +79,10 @@ func createConfig(endpoint string, p []string, useHTTP bool) (interface{}, error
default: default:
prefix = path.Clean(p[1]) prefix = path.Clean(p[1])
} }
return Config{ cfg := NewConfig()
Endpoint: endpoint, cfg.Endpoint = endpoint
UseHTTP: useHTTP, cfg.UseHTTP = useHTTP
Bucket: p[0], cfg.Bucket = p[0]
Prefix: prefix, cfg.Prefix = prefix
}, nil return cfg, nil
} }

View File

@ -7,78 +7,92 @@ var configTests = []struct {
cfg Config cfg Config
}{ }{
{"s3://eu-central-1/bucketname", Config{ {"s3://eu-central-1/bucketname", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/", Config{ {"s3://eu-central-1/bucketname/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/prefix/directory", Config{ {"s3://eu-central-1/bucketname/prefix/directory", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3://eu-central-1/bucketname/prefix/directory/", Config{ {"s3://eu-central-1/bucketname/prefix/directory/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "bucketname", Bucket: "bucketname",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar", Config{ {"s3:eu-central-1/foobar", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/", Config{ {"s3:eu-central-1/foobar/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/prefix/directory", Config{ {"s3:eu-central-1/foobar/prefix/directory", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:eu-central-1/foobar/prefix/directory/", Config{ {"s3:eu-central-1/foobar/prefix/directory/", Config{
Endpoint: "eu-central-1", Endpoint: "eu-central-1",
Bucket: "foobar", Bucket: "foobar",
Prefix: "prefix/directory", Prefix: "prefix/directory",
Connections: 20,
}}, }},
{"s3:https://hostname:9999/foobar", Config{ {"s3:https://hostname:9999/foobar", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:https://hostname:9999/foobar/", Config{ {"s3:https://hostname:9999/foobar/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
Connections: 20,
}}, }},
{"s3:http://hostname:9999/foobar", Config{ {"s3:http://hostname:9999/foobar", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/foobar/", Config{ {"s3:http://hostname:9999/foobar/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "foobar", Bucket: "foobar",
Prefix: "restic", Prefix: "restic",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/bucket/prefix/directory", Config{ {"s3:http://hostname:9999/bucket/prefix/directory", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "bucket", Bucket: "bucket",
Prefix: "prefix/directory", Prefix: "prefix/directory",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
{"s3:http://hostname:9999/bucket/prefix/directory/", Config{ {"s3:http://hostname:9999/bucket/prefix/directory/", Config{
Endpoint: "hostname:9999", Endpoint: "hostname:9999",
Bucket: "bucket", Bucket: "bucket",
Prefix: "prefix/directory", Prefix: "prefix/directory",
UseHTTP: true, UseHTTP: true,
Connections: 20,
}}, }},
} }

View File

@ -23,7 +23,7 @@ const connLimit = 10
// s3 is a backend which stores the data on an S3 endpoint. // s3 is a backend which stores the data on an S3 endpoint.
type s3 struct { type s3 struct {
client *minio.Client client *minio.Client
connChan chan struct{} sem *backend.Semaphore
bucketname string bucketname string
prefix string prefix string
cacheMutex sync.RWMutex cacheMutex sync.RWMutex
@ -43,8 +43,14 @@ func Open(cfg Config) (restic.Backend, error) {
return nil, errors.Wrap(err, "minio.New") return nil, errors.Wrap(err, "minio.New")
} }
sem, err := backend.NewSemaphore(cfg.Connections)
if err != nil {
return nil, err
}
be := &s3{ be := &s3{
client: client, client: client,
sem: sem,
bucketname: cfg.Bucket, bucketname: cfg.Bucket,
prefix: cfg.Prefix, prefix: cfg.Prefix,
cacheObjSize: make(map[string]int64), cacheObjSize: make(map[string]int64),
@ -59,8 +65,6 @@ func Open(cfg Config) (restic.Backend, error) {
be.Layout = l be.Layout = l
be.createConnections()
found, err := client.BucketExists(cfg.Bucket) found, err := client.BucketExists(cfg.Bucket)
if err != nil { if err != nil {
debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err) debug.Log("BucketExists(%v) returned err %v", cfg.Bucket, err)
@ -78,13 +82,6 @@ func Open(cfg Config) (restic.Backend, error) {
return be, nil return be, nil
} }
func (be *s3) createConnections() {
be.connChan = make(chan struct{}, connLimit)
for i := 0; i < connLimit; i++ {
be.connChan <- struct{}{}
}
}
// IsNotExist returns true if the error is caused by a not existing file. // IsNotExist returns true if the error is caused by a not existing file.
func (be *s3) IsNotExist(err error) bool { func (be *s3) IsNotExist(err error) bool {
debug.Log("IsNotExist(%T, %#v)", err, err) debug.Log("IsNotExist(%T, %#v)", err, err)
@ -222,7 +219,7 @@ func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) {
return errors.New("key already exists") return errors.New("key already exists")
} }
<-be.connChan be.sem.GetToken()
// wrap the reader so that net/http client cannot close the reader, return // wrap the reader so that net/http client cannot close the reader, return
// the token instead. // the token instead.
@ -234,11 +231,10 @@ func (be *s3) Save(h restic.Handle, rd io.Reader) (err error) {
} }
debug.Log("PutObject(%v, %v)", be.bucketname, objName) debug.Log("PutObject(%v, %v)", be.bucketname, objName)
coreClient := minio.Core{be.client} coreClient := minio.Core{Client: be.client}
info, err := coreClient.PutObject(be.bucketname, objName, size, rd, nil, nil, nil) info, err := coreClient.PutObject(be.bucketname, objName, size, rd, nil, nil, nil)
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
debug.Log("%v -> %v bytes, err %#v", objName, info.Size, err) debug.Log("%v -> %v bytes, err %#v", objName, info.Size, err)
return errors.Wrap(err, "client.PutObject") return errors.Wrap(err, "client.PutObject")
@ -275,8 +271,7 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er
objName := be.Filename(h) objName := be.Filename(h)
// get token for connection be.sem.GetToken()
<-be.connChan
byteRange := fmt.Sprintf("bytes=%d-", offset) byteRange := fmt.Sprintf("bytes=%d-", offset)
if length > 0 { if length > 0 {
@ -286,11 +281,10 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er
headers.Add("Range", byteRange) headers.Add("Range", byteRange)
debug.Log("Load(%v) send range %v", h, byteRange) debug.Log("Load(%v) send range %v", h, byteRange)
coreClient := minio.Core{be.client} coreClient := minio.Core{Client: be.client}
rd, _, err := coreClient.GetObject(be.bucketname, objName, headers) rd, _, err := coreClient.GetObject(be.bucketname, objName, headers)
if err != nil { if err != nil {
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
return nil, err return nil, err
} }
@ -298,8 +292,7 @@ func (be *s3) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, er
ReadCloser: rd, ReadCloser: rd,
f: func() { f: func() {
debug.Log("Close()") debug.Log("Close()")
// return token be.sem.ReleaseToken()
be.connChan <- struct{}{}
}, },
} }

View File

@ -114,14 +114,13 @@ func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite {
key, secret := newRandomCredentials(t) key, secret := newRandomCredentials(t)
cfg.stopServer = runMinio(ctx, t, cfg.tempdir, key, secret) cfg.stopServer = runMinio(ctx, t, cfg.tempdir, key, secret)
cfg.Config = s3.Config{ cfg.Config = s3.NewConfig()
Endpoint: "localhost:9000", cfg.Config.Endpoint = "localhost:9000"
Bucket: "restictestbucket", cfg.Config.Bucket = "restictestbucket"
Prefix: fmt.Sprintf("test-%d", time.Now().UnixNano()), cfg.Config.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano())
UseHTTP: true, cfg.Config.UseHTTP = true
KeyID: key, cfg.Config.KeyID = key
Secret: secret, cfg.Config.Secret = secret
}
return cfg, nil return cfg, nil
}, },