diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index c50b3413e..8e3c5b53d 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -179,6 +179,12 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error { } bar.Done() + if len(usedBlobs) > stats.blobs { + return errors.Fatalf("number of used blobs is larger than number of available blobs!\n" + + "Please report this error (along with the output of the 'prune' run) at\n" + + "https://github.com/restic/restic/issues/new") + } + Verbosef("found %d of %d data blobs still in use, removing %d blobs\n", len(usedBlobs), stats.blobs, stats.blobs-len(usedBlobs)) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index a59f1e1eb..6fad216bf 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -7,7 +7,6 @@ import ( "os" "path" "strings" - "time" "github.com/Azure/azure-sdk-for-go/storage" "github.com/restic/restic/internal/backend" @@ -18,13 +17,16 @@ import ( // Backend stores data on an azure endpoint. type Backend struct { - accountName string - container *storage.Container - sem *backend.Semaphore - prefix string + accountName string + container *storage.Container + sem *backend.Semaphore + prefix string + listMaxItems int backend.Layout } +const defaultListMaxItems = 5000 + // make sure that *Backend implements backend.Backend var _ restic.Backend = &Backend{} @@ -54,6 +56,7 @@ func open(cfg Config) (*Backend, error) { Path: cfg.Prefix, Join: path.Join, }, + listMaxItems: defaultListMaxItems, } return be, nil @@ -85,6 +88,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { debug.Log("IsNotExist(%T, %#v)", err, err) @@ -96,62 +104,6 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } -type fileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time - isDir bool -} - -func (fi fileInfo) Name() string { return fi.name } // base name of the file -func (fi fileInfo) Size() int64 { return fi.size } // length in bytes for regular files; system-dependent for others -func (fi fileInfo) Mode() os.FileMode { return fi.mode } // file mode bits -func (fi fileInfo) ModTime() time.Time { return fi.modTime } // modification time -func (fi fileInfo) IsDir() bool { return fi.isDir } // abbreviation for Mode().IsDir() -func (fi fileInfo) Sys() interface{} { return nil } // underlying data source (can return nil) - -// ReadDir returns the entries for a directory. -func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { - debug.Log("ReadDir(%v)", dir) - - // make sure dir ends with a slash - if dir[len(dir)-1] != '/' { - dir += "/" - } - - obj, err := be.container.ListBlobs(storage.ListBlobsParameters{Prefix: dir, Delimiter: "/"}) - if err != nil { - return nil, err - } - - for _, item := range obj.BlobPrefixes { - entry := fileInfo{ - name: strings.TrimPrefix(item, dir), - isDir: true, - mode: os.ModeDir | 0755, - } - if entry.name != "" { - list = append(list, entry) - } - } - - for _, item := range obj.Blobs { - entry := fileInfo{ - name: strings.TrimPrefix(item.Name, dir), - isDir: false, - mode: 0644, - size: item.Properties.ContentLength, - modTime: time.Time(item.Properties.LastModified), - } - if entry.name != "" { - list = append(list, entry) - } - } - - return list, nil -} - // Location returns this backend's location (the container name). func (be *Backend) Location() string { return be.Join(be.container.Name, be.prefix) @@ -321,25 +273,39 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { prefix += "/" } + params := storage.ListBlobsParameters{ + MaxResults: uint(be.listMaxItems), + Prefix: prefix, + } + go func() { defer close(ch) - obj, err := be.container.ListBlobs(storage.ListBlobsParameters{Prefix: prefix}) - if err != nil { - return - } - - for _, item := range obj.Blobs { - m := strings.TrimPrefix(item.Name, prefix) - if m == "" { - continue - } - - select { - case ch <- path.Base(m): - case <-ctx.Done(): + for { + obj, err := be.container.ListBlobs(params) + if err != nil { return } + + debug.Log("got %v objects", len(obj.Blobs)) + + for _, item := range obj.Blobs { + m := strings.TrimPrefix(item.Name, prefix) + if m == "" { + continue + } + + select { + case ch <- path.Base(m): + case <-ctx.Done(): + return + } + } + + if obj.NextMarker == "" { + break + } + params.Marker = obj.NextMarker } }() diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 94a13222b..8fd148769 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -16,13 +16,16 @@ import ( // b2Backend is a backend which stores its data on Backblaze B2. type b2Backend struct { - client *b2.Client - bucket *b2.Bucket - cfg Config + client *b2.Client + bucket *b2.Bucket + cfg Config + listMaxItems int backend.Layout sem *backend.Semaphore } +const defaultListMaxItems = 1000 + // ensure statically that *b2Backend implements restic.Backend. var _ restic.Backend = &b2Backend{} @@ -121,6 +124,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *b2Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // Location returns the location for the backend. func (be *b2Backend) Location() string { return be.cfg.Bucket @@ -307,10 +315,11 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType) <-chan string cur := &b2.Cursor{Prefix: prefix} for { - objs, c, err := be.bucket.ListCurrentObjects(ctx, 1000, cur) + objs, c, err := be.bucket.ListCurrentObjects(ctx, be.listMaxItems, cur) if err != nil && err != io.EOF { return } + debug.Log("returned %v items", len(objs)) for _, obj := range objs { // Skip objects returned that do not have the specified prefix. if !strings.HasPrefix(obj.Name(), prefix) { diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 3fc6cf1f7..902726d1b 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -7,7 +7,6 @@ import ( "os" "path" "strings" - "time" "github.com/pkg/errors" "github.com/restic/restic/internal/backend" @@ -23,11 +22,12 @@ import ( // Backend stores data on an gs endpoint. type Backend struct { - service *storage.Service - projectID string - sem *backend.Semaphore - bucketName string - prefix string + service *storage.Service + projectID string + sem *backend.Semaphore + bucketName string + prefix string + listMaxItems int backend.Layout } @@ -56,6 +56,8 @@ func getStorageService(jsonKeyPath string) (*storage.Service, error) { return service, nil } +const defaultListMaxItems = 1000 + func open(cfg Config) (*Backend, error) { debug.Log("open, config %#v", cfg) @@ -79,6 +81,7 @@ func open(cfg Config) (*Backend, error) { Path: cfg.Prefix, Join: path.Join, }, + listMaxItems: defaultListMaxItems, } return be, nil @@ -112,6 +115,11 @@ func Create(cfg Config) (restic.Backend, error) { return be, nil } +// SetListMaxItems sets the number of list items to load per request. +func (be *Backend) SetListMaxItems(i int) { + be.listMaxItems = i +} + // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { debug.Log("IsNotExist(%T, %#v)", err, err) @@ -134,59 +142,6 @@ func (be *Backend) Join(p ...string) string { return path.Join(p...) } -type fileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time - isDir bool -} - -func (fi fileInfo) Name() string { return fi.name } // base name of the file -func (fi fileInfo) Size() int64 { return fi.size } // length in bytes for regular files; system-dependent for others -func (fi fileInfo) Mode() os.FileMode { return fi.mode } // file mode bits -func (fi fileInfo) ModTime() time.Time { return fi.modTime } // modification time -func (fi fileInfo) IsDir() bool { return fi.isDir } // abbreviation for Mode().IsDir() -func (fi fileInfo) Sys() interface{} { return nil } // underlying data source (can return nil) - -// ReadDir returns the entries for a directory. -func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { - debug.Log("ReadDir(%v)", dir) - - // make sure dir ends with a slash - if dir[len(dir)-1] != '/' { - dir += "/" - } - - obj, err := be.service.Objects.List(be.bucketName).Prefix(dir).Delimiter("/").Do() - if err != nil { - return nil, err - } - - for _, item := range obj.Prefixes { - entry := fileInfo{ - name: strings.TrimPrefix(item, dir), - isDir: true, - mode: os.ModeDir | 0755, - } - list = append(list, entry) - } - for _, item := range obj.Items { - entry := fileInfo{ - name: strings.TrimPrefix(item.Name, dir), - isDir: false, - mode: 0644, - size: int64(item.Size), - //modTime: item.Updated, - } - if entry.name != "" { - list = append(list, entry) - } - } - - return list, nil -} - // Location returns this backend's location (the bucket name). func (be *Backend) Location() string { return be.Join(be.bucketName, be.prefix) @@ -352,22 +307,33 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { go func() { defer close(ch) - obj, err := be.service.Objects.List(be.bucketName).Prefix(prefix).Do() - if err != nil { - return - } - - for _, item := range obj.Items { - m := strings.TrimPrefix(item.Name, prefix) - if m == "" { - continue - } - - select { - case ch <- path.Base(m): - case <-ctx.Done(): + listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems)) + for { + obj, err := listReq.Do() + if err != nil { + fmt.Fprintf(os.Stderr, "error listing %v: %v\n", prefix, err) return } + + debug.Log("returned %v items", len(obj.Items)) + + for _, item := range obj.Items { + m := strings.TrimPrefix(item.Name, prefix) + if m == "" { + continue + } + + select { + case ch <- path.Base(m): + case <-ctx.Done(): + return + } + } + + if obj.NextPageToken == "" { + break + } + listReq.PageToken(obj.NextPageToken) } }() diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index 64907f2f7..e32e2c421 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -240,6 +240,78 @@ func (s *Suite) TestLoad(t *testing.T) { test.OK(t, b.Remove(context.TODO(), handle)) } +// TestList makes sure that the backend implements List() pagination correctly. +func (s *Suite) TestList(t *testing.T) { + seedRand(t) + + numTestFiles := rand.Intn(20) + 20 + + b := s.open(t) + defer s.close(t, b) + + list1 := restic.NewIDSet() + + for i := 0; i < numTestFiles; i++ { + data := []byte(fmt.Sprintf("random test blob %v", i)) + id := restic.Hash(data) + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + err := b.Save(context.TODO(), h, bytes.NewReader(data)) + if err != nil { + t.Fatal(err) + } + list1.Insert(id) + } + + t.Logf("wrote %v files", len(list1)) + + var tests = []struct { + maxItems int + }{ + {11}, {23}, {numTestFiles}, {numTestFiles + 10}, {numTestFiles + 1123}, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("max-%v", test.maxItems), func(t *testing.T) { + list2 := restic.NewIDSet() + + type setter interface { + SetListMaxItems(int) + } + + if s, ok := b.(setter); ok { + t.Logf("setting max list items to %d", test.maxItems) + s.SetListMaxItems(test.maxItems) + } + + for name := range b.List(context.TODO(), restic.DataFile) { + id, err := restic.ParseID(name) + if err != nil { + t.Fatal(err) + } + list2.Insert(id) + } + + t.Logf("loaded %v IDs from backend", len(list2)) + + if !list1.Equals(list2) { + t.Errorf("lists are not equal, list1 %d entries, list2 %d entries", + len(list1), len(list2)) + } + }) + } + + t.Logf("remove %d files", numTestFiles) + handles := make([]restic.Handle, 0, len(list1)) + for id := range list1 { + handles = append(handles, restic.Handle{Type: restic.DataFile, Name: id.String()}) + } + + err := s.delayedRemove(t, b, handles...) + if err != nil { + t.Fatal(err) + } +} + type errorCloser struct { io.Reader l int