Merge pull request #4802 from MichaelEischer/backend-cleanups

Repository: Remove Backend() method
This commit is contained in:
Michael Eischer 2024-05-18 22:02:45 +02:00 committed by GitHub
commit 1dfe1b8732
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
52 changed files with 614 additions and 648 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui"
)
@ -347,7 +348,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
for err := range errChan {
errorsFound = true
Warnf("%v\n", err)
if err, ok := err.(*checker.ErrPackData); ok {
if err, ok := err.(*repository.ErrPackData); ok {
salvagePacks = append(salvagePacks, err.PackID)
}
}

View File

@ -20,7 +20,6 @@ import (
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/index"
@ -475,21 +474,12 @@ func runDebugExamine(ctx context.Context, gopts GlobalOptions, opts DebugExamine
func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repository, id restic.ID) error {
Printf("examine %v\n", id)
h := backend.Handle{
Type: restic.PackFile,
Name: id.String(),
}
fi, err := repo.Backend().Stat(ctx, h)
if err != nil {
return err
}
Printf(" file size is %v\n", fi.Size)
buf, err := repo.LoadRaw(ctx, restic.PackFile, id)
// also process damaged pack files
if buf == nil {
return err
}
Printf(" file size is %v\n", len(buf))
gotID := restic.Hash(buf)
if !id.Equal(gotID) {
Printf(" wanted hash %v, got %v\n", id, gotID)
@ -508,7 +498,7 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
continue
}
checkPackSize(blobs, fi.Size)
checkPackSize(blobs, len(buf))
err = loadBlobs(ctx, opts, repo, id, blobs)
if err != nil {
@ -521,11 +511,11 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
Printf(" ========================================\n")
Printf(" inspect the pack itself\n")
blobs, _, err := repo.ListPack(ctx, id, fi.Size)
blobs, _, err := repo.ListPack(ctx, id, int64(len(buf)))
if err != nil {
return fmt.Errorf("pack %v: %v", id.Str(), err)
}
checkPackSize(blobs, fi.Size)
checkPackSize(blobs, len(buf))
if !blobsLoaded {
return loadBlobs(ctx, opts, repo, id, blobs)
@ -533,7 +523,7 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
return nil
}
func checkPackSize(blobs []restic.Blob, fileSize int64) {
func checkPackSize(blobs []restic.Blob, fileSize int) {
// track current size and offset
var size, offset uint64

View File

@ -285,10 +285,6 @@ func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots r
err = restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar)
if err != nil {
if repo.Backend().IsNotExist(err) {
return nil, errors.Fatal("unable to load a tree from the repository: " + err.Error())
}
return nil, err
}
return usedBlobs, nil

View File

@ -8,7 +8,6 @@ import (
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
@ -181,8 +180,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r
if dryRun {
Verbosef("would delete empty snapshot\n")
} else {
h := backend.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
if err = repo.Backend().Remove(ctx, h); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
return false, err
}
debug.Log("removed empty snapshot %v", sn.ID())
@ -241,8 +239,7 @@ func filterAndReplaceSnapshot(ctx context.Context, repo restic.Repository, sn *r
Verbosef("saved new snapshot %v\n", id.Str())
if forget {
h := backend.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
if err = repo.Backend().Remove(ctx, h); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
return false, err
}
debug.Log("removed old snapshot %v", sn.ID())

View File

@ -5,7 +5,6 @@ import (
"github.com/spf13/cobra"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
@ -86,8 +85,7 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna
debug.Log("new snapshot saved as %v", id)
// Remove the old snapshot.
h := backend.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
if err = repo.Backend().Remove(ctx, h); err != nil {
if err = repo.RemoveUnpacked(ctx, restic.SnapshotFile, *sn.ID()); err != nil {
return false, err
}

View File

@ -267,7 +267,7 @@ func removePacks(gopts GlobalOptions, t testing.TB, remove restic.IDSet) {
defer unlock()
for id := range remove {
rtest.OK(t, r.Backend().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}))
rtest.OK(t, r.RemoveUnpacked(ctx, restic.PackFile, id))
}
}
@ -291,7 +291,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
if treePacks.Has(id) != removeTreePacks || keep.Has(id) {
return nil
}
return r.Backend().Remove(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()})
return r.RemoveUnpacked(ctx, restic.PackFile, id)
}))
}

View File

@ -1970,7 +1970,7 @@ func TestArchiverContextCanceled(t *testing.T) {
})
// Ensure that the archiver itself reports the canceled context and not just the backend
repo := repository.TestRepositoryWithBackend(t, &noCancelBackend{mem.New()}, 0, repository.Options{})
repo, _ := repository.TestRepositoryWithBackend(t, &noCancelBackend{mem.New()}, 0, repository.Options{})
back := rtest.Chdir(t, tempdir)
defer back()

View File

@ -190,11 +190,6 @@ func (be *Backend) Connections() uint {
return be.connections
}
// Location returns this backend's location (the container name).
func (be *Backend) Location() string {
return be.Join(be.cfg.AccountName, be.cfg.Prefix)
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *Backend) Hasher() hash.Hash {
return md5.New()

View File

@ -162,11 +162,6 @@ func (be *b2Backend) Connections() uint {
return be.cfg.Connections
}
// Location returns the location for the backend.
func (be *b2Backend) Location() string {
return be.cfg.Bucket
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *b2Backend) Hasher() hash.Hash {
return nil

View File

@ -14,10 +14,6 @@ import (
// the context package need not be wrapped, as context cancellation is checked
// separately by the retrying logic.
type Backend interface {
// Location returns a string that describes the type and location of the
// repository.
Location() string
// Connections returns the maximum number of concurrent backend operations.
Connections() uint

View File

@ -46,11 +46,6 @@ func (be *Backend) Connections() uint {
return be.b.Connections()
}
// Location returns the location of the backend.
func (be *Backend) Location() string {
return "DRY:" + be.b.Location()
}
// Delete removes all data in the backend.
func (be *Backend) Delete(_ context.Context) error {
return nil

View File

@ -36,7 +36,6 @@ func TestDry(t *testing.T) {
content string
wantErr string
}{
{d, "loc", "", "DRY:RAM", ""},
{d, "delete", "", "", ""},
{d, "stat", "a", "", "not found"},
{d, "list", "", "", ""},
@ -76,11 +75,6 @@ func TestDry(t *testing.T) {
if files != step.content {
t.Errorf("%d. List = %q, want %q", i, files, step.content)
}
case "loc":
loc := step.be.Location()
if loc != step.content {
t.Errorf("%d. Location = %q, want %q", i, loc, step.content)
}
case "delete":
err = step.be.Delete(ctx)
case "remove":

View File

@ -197,11 +197,6 @@ func (be *Backend) Connections() uint {
return be.connections
}
// Location returns this backend's location (the bucket name).
func (be *Backend) Location() string {
return be.Join(be.bucketName, be.prefix)
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *Backend) Hasher() hash.Hash {
return md5.New()

View File

@ -93,11 +93,6 @@ func (b *Local) Connections() uint {
return b.Config.Connections
}
// Location returns this backend's location (the directory name).
func (b *Local) Location() string {
return b.Path
}
// Hasher may return a hash function for calculating a content hash for the backend
func (b *Local) Hasher() hash.Hash {
return nil

View File

@ -222,11 +222,6 @@ func (be *MemoryBackend) Connections() uint {
return connectionCount
}
// Location returns the location of the backend (RAM).
func (be *MemoryBackend) Location() string {
return "RAM"
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *MemoryBackend) Hasher() hash.Hash {
return xxhash.New()

View File

@ -21,7 +21,6 @@ type Backend struct {
RemoveFn func(ctx context.Context, h backend.Handle) error
DeleteFn func(ctx context.Context) error
ConnectionsFn func() uint
LocationFn func() string
HasherFn func() hash.Hash
HasAtomicReplaceFn func() bool
}
@ -49,15 +48,6 @@ func (m *Backend) Connections() uint {
return m.ConnectionsFn()
}
// Location returns a location string.
func (m *Backend) Location() string {
if m.LocationFn == nil {
return ""
}
return m.LocationFn()
}
// Hasher may return a hash function for calculating a content hash for the backend
func (m *Backend) Hasher() hash.Hash {
if m.HasherFn == nil {

View File

@ -121,11 +121,6 @@ func (b *Backend) Connections() uint {
return b.connections
}
// Location returns this backend's location (the server's URL).
func (b *Backend) Location() string {
return b.url.String()
}
// Hasher may return a hash function for calculating a content hash for the backend
func (b *Backend) Hasher() hash.Hash {
return nil

View File

@ -321,11 +321,6 @@ func (be *Backend) Connections() uint {
return be.cfg.Connections
}
// Location returns this backend's location (the bucket name).
func (be *Backend) Location() string {
return be.Join(be.cfg.Bucket, be.cfg.Prefix)
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *Backend) Hasher() hash.Hash {
return nil

View File

@ -292,11 +292,6 @@ func (r *SFTP) Connections() uint {
return r.Config.Connections
}
// Location returns this backend's location (the directory name).
func (r *SFTP) Location() string {
return r.p
}
// Hasher may return a hash function for calculating a content hash for the backend
func (r *SFTP) Hasher() hash.Hash {
return nil

View File

@ -118,11 +118,6 @@ func (be *beSwift) Connections() uint {
return be.connections
}
// Location returns this backend's location (the container name).
func (be *beSwift) Location() string {
return be.container
}
// Hasher may return a hash function for calculating a content hash for the backend
func (be *beSwift) Hasher() hash.Hash {
return md5.New()

View File

@ -88,17 +88,6 @@ func (s *Suite[C]) TestCreateWithConfig(t *testing.T) {
}
}
// TestLocation tests that a location string is returned.
func (s *Suite[C]) TestLocation(t *testing.T) {
b := s.open(t)
defer s.close(t, b)
l := b.Location()
if l == "" {
t.Fatalf("invalid location string %q", l)
}
}
// TestConfig saves and loads a config from the backend.
func (s *Suite[C]) TestConfig(t *testing.T) {
b := s.open(t)

View File

@ -2,21 +2,16 @@ package checker
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"runtime"
"sort"
"sync"
"github.com/klauspost/compress/zstd"
"github.com/minio/sha256-simd"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/s3"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/repository"
@ -90,16 +85,6 @@ func (err *ErrOldIndexFormat) Error() string {
return fmt.Sprintf("index %v has old format", err.ID)
}
// ErrPackData is returned if errors are discovered while verifying a packfile
type ErrPackData struct {
PackID restic.ID
errs []error
}
func (e *ErrPackData) Error() string {
return fmt.Sprintf("pack %v contains %v errors: %v", e.PackID, len(e.errs), e.errs)
}
func (c *Checker) LoadSnapshots(ctx context.Context) error {
var err error
c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile)
@ -256,8 +241,10 @@ func isS3Legacy(b backend.Backend) bool {
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
defer close(errChan)
if isS3Legacy(c.repo.Backend()) {
errChan <- ErrLegacyLayout
if r, ok := c.repo.(*repository.Repository); ok {
if isS3Legacy(repository.AsS3Backend(r)) {
errChan <- ErrLegacyLayout
}
}
debug.Log("checking for %d packs", len(c.packs))
@ -522,182 +509,6 @@ func (c *Checker) GetPacks() map[restic.ID]int64 {
return c.packs
}
type partialReadError struct {
err error
}
func (e *partialReadError) Error() string {
return e.err.Error()
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
if err != nil {
// retry pack verification to detect transient errors
err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
if err2 != nil {
err = err2
} else {
err = fmt.Errorf("check successful on second attempt, original error %w", err)
}
}
return err
}
func checkPackInner(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
debug.Log("checking pack %v", id.String())
if len(blobs) == 0 {
return &ErrPackData{PackID: id, errs: []error{errors.New("pack is empty or not indexed")}}
}
// sanity check blobs in index
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
idxHdrSize := pack.CalculateHeaderSize(blobs)
lastBlobEnd := 0
nonContinuousPack := false
for _, blob := range blobs {
if lastBlobEnd != int(blob.Offset) {
nonContinuousPack = true
}
lastBlobEnd = int(blob.Offset + blob.Length)
}
// size was calculated by masterindex.PackSize, thus there's no need to recalculate it here
var errs []error
if nonContinuousPack {
debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs)
errs = append(errs, errors.New("index for pack contains gaps / overlapping blobs"))
}
// calculate hash on-the-fly while reading the pack and capture pack header
var hash restic.ID
var hdrBuf []byte
h := backend.Handle{Type: backend.PackFile, Name: id.String()}
err := r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
break
} else if err != nil {
return &partialReadError{err}
}
debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle)
if val.Err != nil {
debug.Log(" error verifying blob %v: %v", val.Handle.ID, val.Err)
errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, val.Err))
}
}
// skip enough bytes until we reach the possible header start
curPos := lastBlobEnd
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return &partialReadError{err}
}
curPos += minHdrStart - curPos
}
// read remainder, which should be the pack header
var err error
hdrBuf = make([]byte, int(size-int64(curPos)))
_, err = io.ReadFull(bufRd, hdrBuf)
if err != nil {
return &partialReadError{err}
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
if err != nil {
var e *partialReadError
isPartialReadError := errors.As(err, &e)
// failed to load the pack file, return as further checks cannot succeed anyways
debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err)
if isPartialReadError {
return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))}
}
// The check command suggests to repair files for which a `ErrPackData` is returned. However, this file
// completely failed to download such that there's no point in repairing anything.
return fmt.Errorf("download error: %w", err)
}
if !hash.Equal(id) {
debug.Log("pack ID does not match, want %v, got %v", id, hash)
return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("unexpected pack id %v", hash))}
}
blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf)))
if err != nil {
return &ErrPackData{PackID: id, errs: append(errs, err)}
}
if uint32(idxHdrSize) != hdrSize {
debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)
errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize))
}
idx := r.Index()
for _, blob := range blobs {
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.BlobHandle) {
if pb.PackID == id && pb.Blob == blob {
idxHas = true
break
}
}
if !idxHas {
errs = append(errs, errors.Errorf("blob %v is not contained in index or position is incorrect", blob.ID))
continue
}
}
if len(errs) > 0 {
return &ErrPackData{PackID: id, errs: errs}
}
return nil
}
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
@ -741,8 +552,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
}
}
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec)
err := repository.CheckPack(ctx, c.repo.(*repository.Repository), ps.id, ps.blobs, ps.size, bufRd, dec)
p.Add(1)
if err == nil {
continue

View File

@ -73,7 +73,7 @@ func assertOnlyMixedPackHints(t *testing.T, hints []error) {
}
func TestCheckRepo(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, _, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
chkr := checker.New(repo, false)
@ -91,14 +91,11 @@ func TestCheckRepo(t *testing.T) {
}
func TestMissingPack(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, be, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
packHandle := backend.Handle{
Type: restic.PackFile,
Name: "657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6",
}
test.OK(t, repo.Backend().Remove(context.TODO(), packHandle))
packID := restic.TestParseID("657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6")
test.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: packID.String()}))
chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil)
@ -113,23 +110,20 @@ func TestMissingPack(t *testing.T) {
"expected exactly one error, got %v", len(errs))
if err, ok := errs[0].(*checker.PackError); ok {
test.Equals(t, packHandle.Name, err.ID.String())
test.Equals(t, packID, err.ID)
} else {
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
}
}
func TestUnreferencedPack(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, be, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
// index 3f1a only references pack 60e0
packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"
indexHandle := backend.Handle{
Type: restic.IndexFile,
Name: "3f1abfcb79c6f7d0a3be517d2c83c8562fba64ef2c8e9a3544b4edaf8b5e3b44",
}
test.OK(t, repo.Backend().Remove(context.TODO(), indexHandle))
indexID := restic.TestParseID("3f1abfcb79c6f7d0a3be517d2c83c8562fba64ef2c8e9a3544b4edaf8b5e3b44")
test.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: indexID.String()}))
chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil)
@ -151,14 +145,11 @@ func TestUnreferencedPack(t *testing.T) {
}
func TestUnreferencedBlobs(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, _, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
snapshotHandle := backend.Handle{
Type: restic.SnapshotFile,
Name: "51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02",
}
test.OK(t, repo.Backend().Remove(context.TODO(), snapshotHandle))
snapshotID := restic.TestParseID("51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02")
test.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, snapshotID))
unusedBlobsBySnapshot := restic.BlobHandles{
restic.TestParseHandle("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849", restic.DataBlob),
@ -189,7 +180,7 @@ func TestUnreferencedBlobs(t *testing.T) {
}
func TestModifiedIndex(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, be, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
done := make(chan struct{})
@ -217,13 +208,13 @@ func TestModifiedIndex(t *testing.T) {
}()
wr := io.Writer(tmpfile)
var hw *hashing.Writer
if repo.Backend().Hasher() != nil {
hw = hashing.NewWriter(wr, repo.Backend().Hasher())
if be.Hasher() != nil {
hw = hashing.NewWriter(wr, be.Hasher())
wr = hw
}
// read the file from the backend
err = repo.Backend().Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
err = be.Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
_, err := io.Copy(wr, rd)
return err
})
@ -245,7 +236,7 @@ func TestModifiedIndex(t *testing.T) {
t.Fatal(err)
}
err = repo.Backend().Save(context.TODO(), h2, rd)
err = be.Save(context.TODO(), h2, rd)
if err != nil {
t.Fatal(err)
}
@ -266,7 +257,7 @@ func TestModifiedIndex(t *testing.T) {
var checkerDuplicateIndexTestData = filepath.Join("testdata", "duplicate-packs-in-index-test-repo.tar.gz")
func TestDuplicatePacksInIndex(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerDuplicateIndexTestData)
repo, _, cleanup := repository.TestFromFixture(t, checkerDuplicateIndexTestData)
defer cleanup()
chkr := checker.New(repo, false)
@ -343,11 +334,11 @@ func (b *errorOnceBackend) Load(ctx context.Context, h backend.Handle, length in
}
func TestCheckerModifiedData(t *testing.T) {
repo := repository.TestRepository(t)
repo, be := repository.TestRepositoryWithVersion(t, 0)
sn := archiver.TestSnapshot(t, repo, ".", nil)
t.Logf("archived as %v", sn.ID().Str())
errBe := &errorBackend{Backend: repo.Backend()}
errBe := &errorBackend{Backend: be}
for _, test := range []struct {
name string
@ -369,7 +360,7 @@ func TestCheckerModifiedData(t *testing.T) {
},
{
"errorOnceBackend",
&errorOnceBackend{Backend: repo.Backend()},
&errorOnceBackend{Backend: be},
func() {},
func(t *testing.T, err error) {
if !strings.Contains(err.Error(), "check successful on second attempt, original error pack") {
@ -436,7 +427,7 @@ func (r *loadTreesOnceRepository) LoadTree(ctx context.Context, id restic.ID) (*
}
func TestCheckerNoDuplicateTreeDecodes(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, _, cleanup := repository.TestFromFixture(t, checkerTestData)
defer cleanup()
checkRepo := &loadTreesOnceRepository{
Repository: repo,
@ -584,7 +575,7 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
}
func loadBenchRepository(t *testing.B) (*checker.Checker, restic.Repository, func()) {
repo, cleanup := repository.TestFromFixture(t, checkerTestData)
repo, _, cleanup := repository.TestFromFixture(t, checkerTestData)
chkr := checker.New(repo, false)
hints, errs := chkr.LoadIndex(context.TODO(), nil)

View File

@ -15,7 +15,7 @@ import (
var repoFixture = filepath.Join("..", "repository", "testdata", "test-repo.tar.gz")
func TestRepositoryForAllIndexes(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, repoFixture)
repo, _, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup()
expectedIndexIDs := restic.NewIDSet()

View File

@ -270,7 +270,7 @@ func (mi *MasterIndex) MergeFinalIndexes() error {
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// It also removes the old index files and those listed in extraObsolete.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error {
func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error {
p := opts.SaveProgress
p.SetMax(uint64(len(mi.Packs(excludePacks))))

View File

@ -342,7 +342,7 @@ var (
)
func createFilledRepo(t testing.TB, snapshots int, version uint) restic.Repository {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
for i := 0; i < snapshots; i++ {
restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth)

View File

@ -11,6 +11,7 @@ import (
"github.com/restic/restic/internal/backend/s3"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
)
@ -24,7 +25,7 @@ type S3Layout struct{}
// Check tests whether the migration can be applied.
func (m *S3Layout) Check(_ context.Context, repo restic.Repository) (bool, string, error) {
be := backend.AsBackend[*s3.Backend](repo.Backend())
be := repository.AsS3Backend(repo.(*repository.Repository))
if be == nil {
debug.Log("backend is not s3")
return false, "backend is not s3", nil
@ -76,7 +77,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou
// Apply runs the migration.
func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error {
be := backend.AsBackend[*s3.Backend](repo.Backend())
be := repository.AsS3Backend(repo.(*repository.Repository))
if be == nil {
debug.Log("backend is not s3")
return errors.New("backend is not s3")

View File

@ -3,10 +3,8 @@ package migrations
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
)
@ -14,26 +12,6 @@ func init() {
register(&UpgradeRepoV2{})
}
type UpgradeRepoV2Error struct {
UploadNewConfigError error
ReuploadOldConfigError error
BackupFilePath string
}
func (err *UpgradeRepoV2Error) Error() string {
if err.ReuploadOldConfigError != nil {
return fmt.Sprintf("error uploading config (%v), re-uploading old config filed failed as well (%v), but there is a backup of the config file in %v", err.UploadNewConfigError, err.ReuploadOldConfigError, err.BackupFilePath)
}
return fmt.Sprintf("error uploading config (%v), re-uploaded old config was successful, there is a backup of the config file in %v", err.UploadNewConfigError, err.BackupFilePath)
}
func (err *UpgradeRepoV2Error) Unwrap() error {
// consider the original upload error as the primary cause
return err.UploadNewConfigError
}
type UpgradeRepoV2 struct{}
func (*UpgradeRepoV2) Name() string {
@ -56,70 +34,7 @@ func (*UpgradeRepoV2) Check(_ context.Context, repo restic.Repository) (bool, st
func (*UpgradeRepoV2) RepoCheck() bool {
return true
}
func (*UpgradeRepoV2) upgrade(ctx context.Context, repo restic.Repository) error {
h := backend.Handle{Type: backend.ConfigFile}
if !repo.Backend().HasAtomicReplace() {
// remove the original file for backends which do not support atomic overwriting
err := repo.Backend().Remove(ctx, h)
if err != nil {
return fmt.Errorf("remove config failed: %w", err)
}
}
// upgrade config
cfg := repo.Config()
cfg.Version = 2
err := restic.SaveConfig(ctx, repo, cfg)
if err != nil {
return fmt.Errorf("save new config file failed: %w", err)
}
return nil
}
func (m *UpgradeRepoV2) Apply(ctx context.Context, repo restic.Repository) error {
tempdir, err := os.MkdirTemp("", "restic-migrate-upgrade-repo-v2-")
if err != nil {
return fmt.Errorf("create temp dir failed: %w", err)
}
h := backend.Handle{Type: restic.ConfigFile}
// read raw config file and save it to a temp dir, just in case
rawConfigFile, err := repo.LoadRaw(ctx, restic.ConfigFile, restic.ID{})
if err != nil {
return fmt.Errorf("load config file failed: %w", err)
}
backupFileName := filepath.Join(tempdir, "config")
err = os.WriteFile(backupFileName, rawConfigFile, 0600)
if err != nil {
return fmt.Errorf("write config file backup to %v failed: %w", tempdir, err)
}
// run the upgrade
err = m.upgrade(ctx, repo)
if err != nil {
// build an error we can return to the caller
repoError := &UpgradeRepoV2Error{
UploadNewConfigError: err,
BackupFilePath: backupFileName,
}
// try contingency methods, reupload the original file
_ = repo.Backend().Remove(ctx, h)
err = repo.Backend().Save(ctx, h, backend.NewByteReader(rawConfigFile, nil))
if err != nil {
repoError.ReuploadOldConfigError = err
}
return repoError
}
_ = os.Remove(backupFileName)
_ = os.Remove(tempdir)
return nil
return repository.UpgradeRepo(ctx, repo.(*repository.Repository))
}

View File

@ -2,19 +2,13 @@ package migrations
import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/test"
)
func TestUpgradeRepoV2(t *testing.T) {
repo := repository.TestRepositoryWithVersion(t, 1)
repo, _ := repository.TestRepositoryWithVersion(t, 1)
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
@ -35,73 +29,3 @@ func TestUpgradeRepoV2(t *testing.T) {
t.Fatal(err)
}
}
type failBackend struct {
backend.Backend
mu sync.Mutex
ConfigFileSavesUntilError uint
}
func (be *failBackend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
if h.Type != backend.ConfigFile {
return be.Backend.Save(ctx, h, rd)
}
be.mu.Lock()
if be.ConfigFileSavesUntilError == 0 {
be.mu.Unlock()
return errors.New("failure induced for testing")
}
be.ConfigFileSavesUntilError--
be.mu.Unlock()
return be.Backend.Save(ctx, h, rd)
}
func TestUpgradeRepoV2Failure(t *testing.T) {
be := repository.TestBackend(t)
// wrap backend so that it fails upgrading the config after the initial write
be = &failBackend{
ConfigFileSavesUntilError: 1,
Backend: be,
}
repo := repository.TestRepositoryWithBackend(t, be, 1, repository.Options{})
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
m := &UpgradeRepoV2{}
ok, _, err := m.Check(context.Background(), repo)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("migration check returned false")
}
err = m.Apply(context.Background(), repo)
if err == nil {
t.Fatal("expected error returned from Apply(), got nil")
}
upgradeErr := err.(*UpgradeRepoV2Error)
if upgradeErr.UploadNewConfigError == nil {
t.Fatal("expected upload error, got nil")
}
if upgradeErr.ReuploadOldConfigError == nil {
t.Fatal("expected reupload error, got nil")
}
if upgradeErr.BackupFilePath == "" {
t.Fatal("no backup file path found")
}
test.OK(t, os.Remove(upgradeErr.BackupFilePath))
test.OK(t, os.Remove(filepath.Dir(upgradeErr.BackupFilePath)))
}

View File

@ -0,0 +1,210 @@
package repository
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"sort"
"github.com/klauspost/compress/zstd"
"github.com/minio/sha256-simd"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
)
// ErrPackData is returned if errors are discovered while verifying a packfile
type ErrPackData struct {
PackID restic.ID
errs []error
}
func (e *ErrPackData) Error() string {
return fmt.Sprintf("pack %v contains %v errors: %v", e.PackID, len(e.errs), e.errs)
}
type partialReadError struct {
err error
}
func (e *partialReadError) Error() string {
return e.err.Error()
}
// CheckPack reads a pack and checks the integrity of all blobs.
func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
if err != nil {
if r.Cache != nil {
// ignore error as there's not much we can do here
_ = r.Cache.Forget(backend.Handle{Type: restic.PackFile, Name: id.String()})
}
// retry pack verification to detect transient errors
err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
if err2 != nil {
err = err2
} else {
err = fmt.Errorf("check successful on second attempt, original error %w", err)
}
}
return err
}
func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
debug.Log("checking pack %v", id.String())
if len(blobs) == 0 {
return &ErrPackData{PackID: id, errs: []error{errors.New("pack is empty or not indexed")}}
}
// sanity check blobs in index
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
idxHdrSize := pack.CalculateHeaderSize(blobs)
lastBlobEnd := 0
nonContinuousPack := false
for _, blob := range blobs {
if lastBlobEnd != int(blob.Offset) {
nonContinuousPack = true
}
lastBlobEnd = int(blob.Offset + blob.Length)
}
// size was calculated by masterindex.PackSize, thus there's no need to recalculate it here
var errs []error
if nonContinuousPack {
debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs)
errs = append(errs, errors.New("index for pack contains gaps / overlapping blobs"))
}
// calculate hash on-the-fly while reading the pack and capture pack header
var hash restic.ID
var hdrBuf []byte
h := backend.Handle{Type: backend.PackFile, Name: id.String()}
err := r.be.Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
it := newPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == errPackEOF {
break
} else if err != nil {
return &partialReadError{err}
}
debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle)
if val.Err != nil {
debug.Log(" error verifying blob %v: %v", val.Handle.ID, val.Err)
errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, val.Err))
}
}
// skip enough bytes until we reach the possible header start
curPos := lastBlobEnd
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return &partialReadError{err}
}
curPos += minHdrStart - curPos
}
// read remainder, which should be the pack header
var err error
hdrBuf = make([]byte, int(size-int64(curPos)))
_, err = io.ReadFull(bufRd, hdrBuf)
if err != nil {
return &partialReadError{err}
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
if err != nil {
var e *partialReadError
isPartialReadError := errors.As(err, &e)
// failed to load the pack file, return as further checks cannot succeed anyways
debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err)
if isPartialReadError {
return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))}
}
// The check command suggests to repair files for which a `ErrPackData` is returned. However, this file
// completely failed to download such that there's no point in repairing anything.
return fmt.Errorf("download error: %w", err)
}
if !hash.Equal(id) {
debug.Log("pack ID does not match, want %v, got %v", id, hash)
return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("unexpected pack id %v", hash))}
}
blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf)))
if err != nil {
return &ErrPackData{PackID: id, errs: append(errs, err)}
}
if uint32(idxHdrSize) != hdrSize {
debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)
errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize))
}
idx := r.Index()
for _, blob := range blobs {
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.BlobHandle) {
if pb.PackID == id && pb.Blob == blob {
idxHas = true
break
}
}
if !idxHas {
errs = append(errs, errors.Errorf("blob %v is not contained in index or position is incorrect", blob.ID))
continue
}
}
if len(errs) > 0 {
return &ErrPackData{PackID: id, errs: errs}
}
return nil
}
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}

View File

@ -18,7 +18,7 @@ func FuzzSaveLoadBlob(f *testing.F) {
}
id := restic.Hash(blob)
repo := TestRepositoryWithVersion(t, 2)
repo, _ := TestRepositoryWithVersion(t, 2)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)

View File

@ -36,13 +36,13 @@ var lockerInst = &locker{
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
}
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
}
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func (l *locker) Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
lockFn := restic.NewLock
if exclusive {
@ -102,7 +102,7 @@ retryLoop:
refreshChan := make(chan struct{})
forceRefreshChan := make(chan refreshLockRequest)
go l.refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
go l.refreshLocks(ctx, repo.be, lockInfo, refreshChan, forceRefreshChan, logger)
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
return &Unlocker{lockInfo}, ctx, nil

View File

@ -19,7 +19,7 @@ import (
type backendWrapper func(r backend.Backend) (backend.Backend, error)
func openLockTestRepo(t *testing.T, wrapper backendWrapper) restic.Repository {
func openLockTestRepo(t *testing.T, wrapper backendWrapper) (*Repository, backend.Backend) {
be := backend.Backend(mem.New())
// initialize repo
TestRepositoryWithBackend(t, be, 0, Options{})
@ -31,10 +31,10 @@ func openLockTestRepo(t *testing.T, wrapper backendWrapper) restic.Repository {
rtest.OK(t, err)
}
return TestOpenBackend(t, be)
return TestOpenBackend(t, be), be
}
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, lockerInst *locker, retryLock time.Duration) (*Unlocker, context.Context) {
func checkedLockRepo(ctx context.Context, t *testing.T, repo *Repository, lockerInst *locker, retryLock time.Duration) (*Unlocker, context.Context) {
lock, wrappedCtx, err := lockerInst.Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err)
test.OK(t, wrappedCtx.Err())
@ -46,7 +46,7 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository,
func TestLock(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo, _ := openLockTestRepo(t, nil)
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, lockerInst, 0)
lock.Unlock()
@ -57,7 +57,7 @@ func TestLock(t *testing.T) {
func TestLockCancel(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo, _ := openLockTestRepo(t, nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -73,8 +73,8 @@ func TestLockCancel(t *testing.T) {
func TestLockConflict(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo2 := TestOpenBackend(t, repo.Backend())
repo, be := openLockTestRepo(t, nil)
repo2 := TestOpenBackend(t, be)
lock, _, err := Lock(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err)
@ -101,7 +101,7 @@ func (b *writeOnceBackend) Save(ctx context.Context, h backend.Handle, rd backen
func TestLockFailedRefresh(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
repo, _ := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
return &writeOnceBackend{Backend: r}, nil
})
@ -138,7 +138,7 @@ func (b *loggingBackend) Save(ctx context.Context, h backend.Handle, rd backend.
func TestLockSuccessfulRefresh(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
repo, _ := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
return &loggingBackend{
Backend: r,
t: t,
@ -190,7 +190,7 @@ func (b *slowBackend) Save(ctx context.Context, h backend.Handle, rd backend.Rew
func TestLockSuccessfulStaleRefresh(t *testing.T) {
t.Parallel()
var sb *slowBackend
repo := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
repo, _ := openLockTestRepo(t, func(r backend.Backend) (backend.Backend, error) {
sb = &slowBackend{Backend: r}
return sb, nil
})
@ -238,7 +238,7 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
func TestLockWaitTimeout(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err)
@ -260,7 +260,7 @@ func TestLockWaitTimeout(t *testing.T) {
func TestLockWaitCancel(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err)
@ -286,7 +286,7 @@ func TestLockWaitCancel(t *testing.T) {
func TestLockWaitSuccess(t *testing.T) {
t.Parallel()
repo := openLockTestRepo(t, nil)
repo, _ := openLockTestRepo(t, nil)
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
test.OK(t, err)

View File

@ -621,7 +621,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
// deleteFiles deletes the given fileList of fileType in parallel
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
func deleteFiles(ctx context.Context, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error {
func deleteFiles(ctx context.Context, ignoreError bool, repo restic.RemoverUnpacked, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error {
bar := printer.NewCounter("files deleted")
defer bar.Done()

View File

@ -14,7 +14,7 @@ import (
)
func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
repo := repository.TestRepository(t).(*repository.Repository)
repo, be := repository.TestRepositoryWithVersion(t, 0)
createRandomBlobs(t, repo, 4, 0.5, true)
createRandomBlobs(t, repo, 5, 0.5, true)
keep, _ := selectBlobs(t, repo, 0.5)
@ -37,7 +37,7 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{}))
repo = repository.TestOpenBackend(t, repo.Backend()).(*repository.Repository)
repo = repository.TestOpenBackend(t, be)
checker.TestCheckRepo(t, repo, true)
if errOnUnused {

View File

@ -167,7 +167,7 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest
}
for id := range repackedBlobs {
err = repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: id.String()})
err = repo.RemoveUnpacked(context.TODO(), restic.PackFile, id)
if err != nil {
t.Fatal(err)
}
@ -215,7 +215,7 @@ func TestRepack(t *testing.T) {
}
func testRepack(t *testing.T, version uint) {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
seed := time.Now().UnixNano()
rand.Seed(seed)
@ -293,8 +293,8 @@ func (r oneConnectionRepo) Connections() uint {
}
func testRepackCopy(t *testing.T, version uint) {
repo := repository.TestRepositoryWithVersion(t, version)
dstRepo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
dstRepo, _ := repository.TestRepositoryWithVersion(t, version)
// test with minimal possible connection count
repoWrapped := &oneConnectionRepo{repo}
@ -340,7 +340,7 @@ func TestRepackWrongBlob(t *testing.T) {
func testRepackWrongBlob(t *testing.T, version uint) {
// disable verification to allow adding corrupted blobs to the repository
repo := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
repo, _ := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
seed := time.Now().UnixNano()
rand.Seed(seed)
@ -366,7 +366,7 @@ func TestRepackBlobFallback(t *testing.T) {
func testRepackBlobFallback(t *testing.T, version uint) {
// disable verification to allow adding corrupted blobs to the repository
repo := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
repo, _ := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
seed := time.Now().UnixNano()
rand.Seed(seed)

View File

@ -16,16 +16,16 @@ func listIndex(t *testing.T, repo restic.Lister) restic.IDSet {
return listFiles(t, repo, restic.IndexFile)
}
func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, repo *repository.Repository)) {
repo := repository.TestRepository(t).(*repository.Repository)
func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, repo *repository.Repository, be backend.Backend)) {
repo, be := repository.TestRepositoryWithVersion(t, 0)
createRandomBlobs(t, repo, 4, 0.5, true)
createRandomBlobs(t, repo, 5, 0.5, true)
indexes := listIndex(t, repo)
t.Logf("old indexes %v", indexes)
damage(t, repo)
damage(t, repo, be)
repo = repository.TestOpenBackend(t, repo.Backend()).(*repository.Repository)
repo = repository.TestOpenBackend(t, be)
rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{
ReadAllPacks: readAllPacks,
}, &progress.NoopPrinter{}))
@ -40,17 +40,17 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T,
func TestRebuildIndex(t *testing.T) {
for _, test := range []struct {
name string
damage func(t *testing.T, repo *repository.Repository)
damage func(t *testing.T, repo *repository.Repository, be backend.Backend)
}{
{
"valid index",
func(t *testing.T, repo *repository.Repository) {},
func(t *testing.T, repo *repository.Repository, be backend.Backend) {},
},
{
"damaged index",
func(t *testing.T, repo *repository.Repository) {
func(t *testing.T, repo *repository.Repository, be backend.Backend) {
index := listIndex(t, repo).List()[0]
replaceFile(t, repo, backend.Handle{Type: restic.IndexFile, Name: index.String()}, func(b []byte) []byte {
replaceFile(t, be, backend.Handle{Type: restic.IndexFile, Name: index.String()}, func(b []byte) []byte {
b[0] ^= 0xff
return b
})
@ -58,16 +58,16 @@ func TestRebuildIndex(t *testing.T) {
},
{
"missing index",
func(t *testing.T, repo *repository.Repository) {
func(t *testing.T, repo *repository.Repository, be backend.Backend) {
index := listIndex(t, repo).List()[0]
rtest.OK(t, repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: index.String()}))
rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: index.String()}))
},
},
{
"missing pack",
func(t *testing.T, repo *repository.Repository) {
func(t *testing.T, repo *repository.Repository, be backend.Backend) {
pack := listPacks(t, repo).List()[0]
rtest.OK(t, repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: pack.String()}))
rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: pack.String()}))
},
},
} {

View File

@ -24,12 +24,12 @@ func listBlobs(repo restic.Repository) restic.BlobSet {
return blobs
}
func replaceFile(t *testing.T, repo restic.Repository, h backend.Handle, damage func([]byte) []byte) {
buf, err := backendtest.LoadAll(context.TODO(), repo.Backend(), h)
func replaceFile(t *testing.T, be backend.Backend, h backend.Handle, damage func([]byte) []byte) {
buf, err := backendtest.LoadAll(context.TODO(), be, h)
test.OK(t, err)
buf = damage(buf)
test.OK(t, repo.Backend().Remove(context.TODO(), h))
test.OK(t, repo.Backend().Save(context.TODO(), h, backend.NewByteReader(buf, repo.Backend().Hasher())))
test.OK(t, be.Remove(context.TODO(), h))
test.OK(t, be.Save(context.TODO(), h, backend.NewByteReader(buf, be.Hasher())))
}
func TestRepairBrokenPack(t *testing.T) {
@ -39,17 +39,17 @@ func TestRepairBrokenPack(t *testing.T) {
func testRepairBrokenPack(t *testing.T, version uint) {
tests := []struct {
name string
damage func(t *testing.T, repo restic.Repository, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet)
damage func(t *testing.T, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet)
}{
{
"valid pack",
func(t *testing.T, repo restic.Repository, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
func(t *testing.T, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
return packsBefore, restic.NewBlobSet()
},
},
{
"broken pack",
func(t *testing.T, repo restic.Repository, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
func(t *testing.T, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
wrongBlob := createRandomWrongBlob(t, repo)
damagedPacks := findPacksForBlobs(t, repo, restic.NewBlobSet(wrongBlob))
return damagedPacks, restic.NewBlobSet(wrongBlob)
@ -57,10 +57,10 @@ func testRepairBrokenPack(t *testing.T, version uint) {
},
{
"partially broken pack",
func(t *testing.T, repo restic.Repository, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
func(t *testing.T, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
// damage one of the pack files
damagedID := packsBefore.List()[0]
replaceFile(t, repo, backend.Handle{Type: backend.PackFile, Name: damagedID.String()},
replaceFile(t, be, backend.Handle{Type: backend.PackFile, Name: damagedID.String()},
func(buf []byte) []byte {
buf[0] ^= 0xff
return buf
@ -80,10 +80,10 @@ func testRepairBrokenPack(t *testing.T, version uint) {
},
}, {
"truncated pack",
func(t *testing.T, repo restic.Repository, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
func(t *testing.T, repo *repository.Repository, be backend.Backend, packsBefore restic.IDSet) (restic.IDSet, restic.BlobSet) {
// damage one of the pack files
damagedID := packsBefore.List()[0]
replaceFile(t, repo, backend.Handle{Type: backend.PackFile, Name: damagedID.String()},
replaceFile(t, be, backend.Handle{Type: backend.PackFile, Name: damagedID.String()},
func(buf []byte) []byte {
buf = buf[0:10]
return buf
@ -104,7 +104,7 @@ func testRepairBrokenPack(t *testing.T, version uint) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// disable verification to allow adding corrupted blobs to the repository
repo := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
repo, be := repository.TestRepositoryWithBackend(t, nil, version, repository.Options{NoExtraVerify: true})
seed := time.Now().UnixNano()
rand.Seed(seed)
@ -114,7 +114,7 @@ func testRepairBrokenPack(t *testing.T, version uint) {
packsBefore := listPacks(t, repo)
blobsBefore := listBlobs(repo)
toRepair, damagedBlobs := test.damage(t, repo, packsBefore)
toRepair, damagedBlobs := test.damage(t, repo, be, packsBefore)
rtest.OK(t, repository.RepairPacks(context.TODO(), repo, toRepair, &progress.NoopPrinter{}))
// reload index

View File

@ -271,7 +271,7 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu
continue
}
it := NewPackBlobIterator(blob.PackID, newByteReader(buf), uint(blob.Offset), []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder())
it := newPackBlobIterator(blob.PackID, newByteReader(buf), uint(blob.Offset), []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder())
pbv, err := it.Next()
if err == nil {
@ -520,6 +520,11 @@ func (r *Repository) verifyUnpacked(buf []byte, t restic.FileType, expected []by
return nil
}
func (r *Repository) RemoveUnpacked(ctx context.Context, t restic.FileType, id restic.ID) error {
// TODO prevent everything except removing snapshots for non-repository code
return r.be.Remove(ctx, backend.Handle{Type: t, Name: id.String()})
}
// Flush saves all remaining packs and the index
func (r *Repository) Flush(ctx context.Context) error {
if err := r.flushPacks(ctx); err != nil {
@ -574,11 +579,6 @@ func (r *Repository) flushPacks(ctx context.Context) error {
return err
}
// Backend returns the backend for the repository.
func (r *Repository) Backend() backend.Backend {
return r.be
}
func (r *Repository) Connections() uint {
return r.be.Connections()
}
@ -869,7 +869,7 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic
func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, uint32, error) {
h := backend.Handle{Type: restic.PackFile, Name: id.String()}
entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size)
entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size)
if err != nil {
if r.Cache != nil {
// ignore error as there is not much we can do here
@ -877,7 +877,7 @@ func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]
}
// retry on error
entries, hdrSize, err = pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size)
entries, hdrSize, err = pack.List(r.Key(), backend.ReaderAt(ctx, r.be, h), size)
}
return entries, hdrSize, err
}
@ -943,7 +943,7 @@ const maxUnusedRange = 1 * 1024 * 1024
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.be.Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
}
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
@ -1024,11 +1024,11 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl
return errors.Wrap(err, "StreamPack")
}
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
it := newPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
if err == errPackEOF {
break
} else if err != nil {
return err
@ -1093,7 +1093,7 @@ func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
return buf, nil
}
type PackBlobIterator struct {
type packBlobIterator struct {
packID restic.ID
rd discardReader
currentOffset uint
@ -1105,17 +1105,17 @@ type PackBlobIterator struct {
decode []byte
}
type PackBlobValue struct {
type packBlobValue struct {
Handle restic.BlobHandle
Plaintext []byte
Err error
}
var ErrPackEOF = errors.New("reached EOF of pack file")
var errPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{
func newPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *packBlobIterator {
return &packBlobIterator{
packID: packID,
rd: rd,
currentOffset: currentOffset,
@ -1126,9 +1126,9 @@ func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
}
// Next returns the next blob, an error or ErrPackEOF if all blobs were read
func (b *PackBlobIterator) Next() (PackBlobValue, error) {
func (b *packBlobIterator) Next() (packBlobValue, error) {
if len(b.blobs) == 0 {
return PackBlobValue{}, ErrPackEOF
return packBlobValue{}, errPackEOF
}
entry := b.blobs[0]
@ -1136,12 +1136,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
skipBytes := int(entry.Offset - b.currentOffset)
if skipBytes < 0 {
return PackBlobValue{}, fmt.Errorf("overlapping blobs in pack %v", b.packID)
return packBlobValue{}, fmt.Errorf("overlapping blobs in pack %v", b.packID)
}
_, err := b.rd.Discard(skipBytes)
if err != nil {
return PackBlobValue{}, err
return packBlobValue{}, err
}
b.currentOffset = entry.Offset
@ -1151,14 +1151,14 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
buf, err := b.rd.ReadFull(int(entry.Length))
if err != nil {
debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
return packBlobValue{}, fmt.Errorf("readFull: %w", err)
}
b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() {
debug.Log("%v", b.blobs)
return PackBlobValue{}, fmt.Errorf("invalid blob length %v", entry)
return packBlobValue{}, fmt.Errorf("invalid blob length %v", entry)
}
// decryption errors are likely permanent, give the caller a chance to skip them
@ -1186,7 +1186,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
}
}
return PackBlobValue{entry.BlobHandle, plaintext, err}, nil
return packBlobValue{entry.BlobHandle, plaintext, err}, nil
}
var zeroChunkOnce sync.Once

View File

@ -353,7 +353,7 @@ func testStreamPack(t *testing.T, version uint) {
}
func TestBlobVerification(t *testing.T) {
repo := TestRepository(t).(*Repository)
repo := TestRepository(t)
type DamageType string
const (
@ -402,7 +402,7 @@ func TestBlobVerification(t *testing.T) {
}
func TestUnpackedVerification(t *testing.T) {
repo := TestRepository(t).(*Repository)
repo := TestRepository(t)
type DamageType string
const (

View File

@ -45,7 +45,7 @@ func testSaveCalculateID(t *testing.T, version uint) {
}
func testSave(t *testing.T, version uint, calculateID bool) {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
for _, size := range testSizes {
data := make([]byte, size)
@ -88,7 +88,7 @@ func BenchmarkSaveAndEncrypt(t *testing.B) {
}
func benchmarkSaveAndEncrypt(t *testing.B, version uint) {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
size := 4 << 20 // 4MiB
data := make([]byte, size)
@ -114,7 +114,7 @@ func TestLoadBlob(t *testing.T) {
}
func testLoadBlob(t *testing.T, version uint) {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -145,7 +145,7 @@ func testLoadBlob(t *testing.T, version uint) {
func TestLoadBlobBroken(t *testing.T) {
be := mem.New()
repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository)
repo, _ := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{})
buf := test.Random(42, 1000)
var wg errgroup.Group
@ -170,7 +170,7 @@ func BenchmarkLoadBlob(b *testing.B) {
}
func benchmarkLoadBlob(b *testing.B, version uint) {
repo := repository.TestRepositoryWithVersion(b, version)
repo, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -211,7 +211,7 @@ func BenchmarkLoadUnpacked(b *testing.B) {
}
func benchmarkLoadUnpacked(b *testing.B, version uint) {
repo := repository.TestRepositoryWithVersion(b, version)
repo, _ := repository.TestRepositoryWithVersion(b, version)
length := 1000000
buf := crypto.NewBlobBuffer(length)
_, err := io.ReadFull(rnd, buf)
@ -247,7 +247,7 @@ func benchmarkLoadUnpacked(b *testing.B, version uint) {
var repoFixture = filepath.Join("testdata", "test-repo.tar.gz")
func TestRepositoryLoadIndex(t *testing.T) {
repo, cleanup := repository.TestFromFixture(t, repoFixture)
repo, _, cleanup := repository.TestFromFixture(t, repoFixture)
defer cleanup()
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
@ -268,7 +268,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*
}
func TestRepositoryLoadUnpackedBroken(t *testing.T) {
repo := repository.TestRepository(t)
repo, be := repository.TestRepositoryWithVersion(t, 0)
data := rtest.Random(23, 12345)
id := restic.Hash(data)
@ -277,7 +277,7 @@ func TestRepositoryLoadUnpackedBroken(t *testing.T) {
data[0] ^= 0xff
// store broken file
err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, repo.Backend().Hasher()))
err := be.Save(context.TODO(), h, backend.NewByteReader(data, be.Hasher()))
rtest.OK(t, err)
_, err = repo.LoadUnpacked(context.TODO(), restic.IndexFile, id)
@ -322,7 +322,7 @@ func BenchmarkLoadIndex(b *testing.B) {
func benchmarkLoadIndex(b *testing.B, version uint) {
repository.TestUseLowSecurityKDFParameters(b)
repo := repository.TestRepositoryWithVersion(b, version)
repo, be := repository.TestRepositoryWithVersion(b, version)
idx := index.NewIndex()
for i := 0; i < 5000; i++ {
@ -340,7 +340,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) {
rtest.OK(b, err)
b.Logf("index saved as %v", id.Str())
fi, err := repo.Backend().Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()})
fi, err := be.Stat(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: id.String()})
rtest.OK(b, err)
b.Logf("filesize is %v", fi.Size)
@ -374,7 +374,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) {
}
func testRepositoryIncrementalIndex(t *testing.T, version uint) {
repo := repository.TestRepositoryWithVersion(t, version).(*repository.Repository)
repo, _ := repository.TestRepositoryWithVersion(t, version)
index.IndexFull = func(*index.Index, bool) bool { return true }
@ -425,7 +425,7 @@ func TestInvalidCompression(t *testing.T) {
func TestListPack(t *testing.T) {
be := mem.New()
repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository)
repo, _ := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{})
buf := test.Random(42, 1000)
var wg errgroup.Group
@ -440,7 +440,7 @@ func TestListPack(t *testing.T) {
// Forcibly cache pack file
packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID
rtest.OK(t, repo.Backend().Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil }))
rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil }))
// Get size to list pack
var size int64

View File

@ -0,0 +1,12 @@
package repository
import (
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/s3"
)
// AsS3Backend extracts the S3 backend from a repository
// TODO remove me once restic 0.17 was released
func AsS3Backend(repo *Repository) *s3.Backend {
return backend.AsBackend[*s3.Backend](repo.be)
}

View File

@ -46,7 +46,7 @@ const testChunkerPol = chunker.Pol(0x3DA3358B4DC173)
// TestRepositoryWithBackend returns a repository initialized with a test
// password. If be is nil, an in-memory backend is used. A constant polynomial
// is used for the chunker and low-security test parameters.
func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, opts Options) restic.Repository {
func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, opts Options) (*Repository, backend.Backend) {
t.Helper()
TestUseLowSecurityKDFParameters(t)
restic.TestDisableCheckPolynomial(t)
@ -69,19 +69,20 @@ func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, o
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
}
return repo
return repo, be
}
// TestRepository returns a repository initialized with a test password on an
// in-memory backend. When the environment variable RESTIC_TEST_REPO is set to
// a non-existing directory, a local backend is created there and this is used
// instead. The directory is not removed, but left there for inspection.
func TestRepository(t testing.TB) restic.Repository {
func TestRepository(t testing.TB) *Repository {
t.Helper()
return TestRepositoryWithVersion(t, 0)
repo, _ := TestRepositoryWithVersion(t, 0)
return repo
}
func TestRepositoryWithVersion(t testing.TB, version uint) restic.Repository {
func TestRepositoryWithVersion(t testing.TB, version uint) (*Repository, backend.Backend) {
t.Helper()
dir := os.Getenv("RESTIC_TEST_REPO")
opts := Options{}
@ -103,15 +104,15 @@ func TestRepositoryWithVersion(t testing.TB, version uint) restic.Repository {
return TestRepositoryWithBackend(t, nil, version, opts)
}
func TestFromFixture(t testing.TB, repoFixture string) (restic.Repository, func()) {
func TestFromFixture(t testing.TB, repoFixture string) (*Repository, backend.Backend, func()) {
repodir, cleanup := test.Env(t, repoFixture)
repo := TestOpenLocal(t, repodir)
repo, be := TestOpenLocal(t, repodir)
return repo, cleanup
return repo, be, cleanup
}
// TestOpenLocal opens a local repository.
func TestOpenLocal(t testing.TB, dir string) restic.Repository {
func TestOpenLocal(t testing.TB, dir string) (*Repository, backend.Backend) {
var be backend.Backend
be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2})
if err != nil {
@ -120,10 +121,10 @@ func TestOpenLocal(t testing.TB, dir string) restic.Repository {
be = retry.New(be, 3, nil, nil)
return TestOpenBackend(t, be)
return TestOpenBackend(t, be), be
}
func TestOpenBackend(t testing.TB, be backend.Backend) restic.Repository {
func TestOpenBackend(t testing.TB, be backend.Backend) *Repository {
repo, err := New(be, Options{})
if err != nil {
t.Fatal(err)

View File

@ -0,0 +1,103 @@
package repository
import (
"context"
"fmt"
"os"
"path/filepath"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/restic"
)
type upgradeRepoV2Error struct {
UploadNewConfigError error
ReuploadOldConfigError error
BackupFilePath string
}
func (err *upgradeRepoV2Error) Error() string {
if err.ReuploadOldConfigError != nil {
return fmt.Sprintf("error uploading config (%v), re-uploading old config filed failed as well (%v), but there is a backup of the config file in %v", err.UploadNewConfigError, err.ReuploadOldConfigError, err.BackupFilePath)
}
return fmt.Sprintf("error uploading config (%v), re-uploaded old config was successful, there is a backup of the config file in %v", err.UploadNewConfigError, err.BackupFilePath)
}
func (err *upgradeRepoV2Error) Unwrap() error {
// consider the original upload error as the primary cause
return err.UploadNewConfigError
}
func upgradeRepository(ctx context.Context, repo *Repository) error {
h := backend.Handle{Type: backend.ConfigFile}
if !repo.be.HasAtomicReplace() {
// remove the original file for backends which do not support atomic overwriting
err := repo.be.Remove(ctx, h)
if err != nil {
return fmt.Errorf("remove config failed: %w", err)
}
}
// upgrade config
cfg := repo.Config()
cfg.Version = 2
err := restic.SaveConfig(ctx, repo, cfg)
if err != nil {
return fmt.Errorf("save new config file failed: %w", err)
}
return nil
}
func UpgradeRepo(ctx context.Context, repo *Repository) error {
if repo.Config().Version != 1 {
return fmt.Errorf("repository has version %v, only upgrades from version 1 are supported", repo.Config().Version)
}
tempdir, err := os.MkdirTemp("", "restic-migrate-upgrade-repo-v2-")
if err != nil {
return fmt.Errorf("create temp dir failed: %w", err)
}
h := backend.Handle{Type: restic.ConfigFile}
// read raw config file and save it to a temp dir, just in case
rawConfigFile, err := repo.LoadRaw(ctx, restic.ConfigFile, restic.ID{})
if err != nil {
return fmt.Errorf("load config file failed: %w", err)
}
backupFileName := filepath.Join(tempdir, "config")
err = os.WriteFile(backupFileName, rawConfigFile, 0600)
if err != nil {
return fmt.Errorf("write config file backup to %v failed: %w", tempdir, err)
}
// run the upgrade
err = upgradeRepository(ctx, repo)
if err != nil {
// build an error we can return to the caller
repoError := &upgradeRepoV2Error{
UploadNewConfigError: err,
BackupFilePath: backupFileName,
}
// try contingency methods, reupload the original file
_ = repo.be.Remove(ctx, h)
err = repo.be.Save(ctx, h, backend.NewByteReader(rawConfigFile, nil))
if err != nil {
repoError.ReuploadOldConfigError = err
}
return repoError
}
_ = os.Remove(backupFileName)
_ = os.Remove(tempdir)
return nil
}

View File

@ -0,0 +1,82 @@
package repository
import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors"
rtest "github.com/restic/restic/internal/test"
)
func TestUpgradeRepoV2(t *testing.T) {
repo, _ := TestRepositoryWithVersion(t, 1)
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
err := UpgradeRepo(context.Background(), repo)
rtest.OK(t, err)
}
type failBackend struct {
backend.Backend
mu sync.Mutex
ConfigFileSavesUntilError uint
}
func (be *failBackend) Save(ctx context.Context, h backend.Handle, rd backend.RewindReader) error {
if h.Type != backend.ConfigFile {
return be.Backend.Save(ctx, h, rd)
}
be.mu.Lock()
if be.ConfigFileSavesUntilError == 0 {
be.mu.Unlock()
return errors.New("failure induced for testing")
}
be.ConfigFileSavesUntilError--
be.mu.Unlock()
return be.Backend.Save(ctx, h, rd)
}
func TestUpgradeRepoV2Failure(t *testing.T) {
be := TestBackend(t)
// wrap backend so that it fails upgrading the config after the initial write
be = &failBackend{
ConfigFileSavesUntilError: 1,
Backend: be,
}
repo, _ := TestRepositoryWithBackend(t, be, 1, Options{})
if repo.Config().Version != 1 {
t.Fatal("test repo has wrong version")
}
err := UpgradeRepo(context.Background(), repo)
if err == nil {
t.Fatal("expected error returned from Apply(), got nil")
}
upgradeErr := err.(*upgradeRepoV2Error)
if upgradeErr.UploadNewConfigError == nil {
t.Fatal("expected upload error, got nil")
}
if upgradeErr.ReuploadOldConfigError == nil {
t.Fatal("expected reupload error, got nil")
}
if upgradeErr.BackupFilePath == "" {
t.Fatal("no backup file path found")
}
rtest.OK(t, os.Remove(upgradeErr.BackupFilePath))
rtest.OK(t, os.Remove(filepath.Dir(upgradeErr.BackupFilePath)))
}

View File

@ -12,7 +12,6 @@ import (
"testing"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/debug"
@ -36,7 +35,7 @@ type Lock struct {
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
repo Repository
repo Unpacked
lockID *ID
}
@ -87,14 +86,14 @@ var ErrRemovedLock = errors.New("lock file was removed in the meantime")
// NewLock returns a new, non-exclusive lock for the repository. If an
// exclusive lock is already held by another process, it returns an error
// that satisfies IsAlreadyLocked.
func NewLock(ctx context.Context, repo Repository) (*Lock, error) {
func NewLock(ctx context.Context, repo Unpacked) (*Lock, error) {
return newLock(ctx, repo, false)
}
// NewExclusiveLock returns a new, exclusive lock for the repository. If
// another lock (normal and exclusive) is already held by another process,
// it returns an error that satisfies IsAlreadyLocked.
func NewExclusiveLock(ctx context.Context, repo Repository) (*Lock, error) {
func NewExclusiveLock(ctx context.Context, repo Unpacked) (*Lock, error) {
return newLock(ctx, repo, true)
}
@ -106,7 +105,7 @@ func TestSetLockTimeout(t testing.TB, d time.Duration) {
waitBeforeLockCheck = d
}
func newLock(ctx context.Context, repo Repository, excl bool) (*Lock, error) {
func newLock(ctx context.Context, repo Unpacked, excl bool) (*Lock, error) {
lock := &Lock{
Time: time.Now(),
PID: os.Getpid(),
@ -226,7 +225,7 @@ func (l *Lock) Unlock() error {
return nil
}
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: l.lockID.String()})
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *l.lockID)
}
var StaleLockTimeout = 30 * time.Minute
@ -286,7 +285,7 @@ func (l *Lock) Refresh(ctx context.Context) error {
oldLockID := l.lockID
l.lockID = &id
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: oldLockID.String()})
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *oldLockID)
}
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
@ -315,13 +314,13 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
exists, err = l.checkExistence(ctx)
if err != nil {
// cleanup replacement lock
_ = l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: id.String()})
_ = l.repo.RemoveUnpacked(context.TODO(), LockFile, id)
return err
}
if !exists {
// cleanup replacement lock
_ = l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: id.String()})
_ = l.repo.RemoveUnpacked(context.TODO(), LockFile, id)
return ErrRemovedLock
}
@ -332,7 +331,7 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
oldLockID := l.lockID
l.lockID = &id
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: oldLockID.String()})
return l.repo.RemoveUnpacked(context.TODO(), LockFile, *oldLockID)
}
func (l *Lock) checkExistence(ctx context.Context) (bool, error) {
@ -390,7 +389,7 @@ func LoadLock(ctx context.Context, repo LoaderUnpacked, id ID) (*Lock, error) {
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
func RemoveStaleLocks(ctx context.Context, repo Unpacked) (uint, error) {
var processed uint
err := ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error {
if err != nil {
@ -400,7 +399,7 @@ func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
}
if lock.Stale() {
err = repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: id.String()})
err = repo.RemoveUnpacked(ctx, LockFile, id)
if err == nil {
processed++
}
@ -413,10 +412,10 @@ func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
}
// RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
func RemoveAllLocks(ctx context.Context, repo Unpacked) (uint, error) {
var processed uint32
err := ParallelList(ctx, repo, LockFile, repo.Connections(), func(ctx context.Context, id ID, _ int64) error {
err := repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: id.String()})
err := repo.RemoveUnpacked(ctx, LockFile, id)
if err == nil {
atomic.AddUint32(&processed, 1)
}

View File

@ -66,7 +66,7 @@ func (be *failLockLoadingBackend) Load(ctx context.Context, h backend.Handle, le
func TestMultipleLockFailure(t *testing.T) {
be := &failLockLoadingBackend{Backend: mem.New()}
repo := repository.TestRepositoryWithBackend(t, be, 0, repository.Options{})
repo, _ := repository.TestRepositoryWithBackend(t, be, 0, repository.Options{})
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock1, err := restic.NewLock(context.TODO(), repo)
@ -130,9 +130,8 @@ func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID,
return restic.SaveJSONUnpacked(context.TODO(), repo, restic.LockFile, &newLock)
}
func removeLock(repo restic.Repository, id restic.ID) error {
h := backend.Handle{Type: restic.LockFile, Name: id.String()}
return repo.Backend().Remove(context.TODO(), h)
func removeLock(repo restic.RemoverUnpacked, id restic.ID) error {
return repo.RemoveUnpacked(context.TODO(), restic.LockFile, id)
}
var staleLockTests = []struct {
@ -191,13 +190,16 @@ func TestLockStale(t *testing.T) {
}
}
func lockExists(repo restic.Repository, t testing.TB, id restic.ID) bool {
h := backend.Handle{Type: restic.LockFile, Name: id.String()}
_, err := repo.Backend().Stat(context.TODO(), h)
if err != nil && !repo.Backend().IsNotExist(err) {
t.Fatal(err)
}
return err == nil
func lockExists(repo restic.Lister, t testing.TB, lockID restic.ID) bool {
var exists bool
rtest.OK(t, repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
if id == lockID {
exists = true
}
return nil
}))
return exists
}
func TestLockWithStaleLock(t *testing.T) {
@ -310,7 +312,7 @@ func TestLockRefreshStale(t *testing.T) {
}
func TestLockRefreshStaleMissing(t *testing.T) {
repo := repository.TestRepository(t)
repo, be := repository.TestRepositoryWithVersion(t, 0)
restic.TestSetLockTimeout(t, 5*time.Millisecond)
lock, err := restic.NewLock(context.TODO(), repo)
@ -318,7 +320,7 @@ func TestLockRefreshStaleMissing(t *testing.T) {
lockID := checkSingleLock(t, repo)
// refresh must fail if lock was removed
rtest.OK(t, repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.LockFile, Name: lockID.String()}))
rtest.OK(t, be.Remove(context.TODO(), backend.Handle{Type: restic.LockFile, Name: lockID.String()}))
time.Sleep(time.Millisecond)
err = lock.RefreshStaleLock(context.TODO())
rtest.Assert(t, err == restic.ErrRemovedLock, "unexpected error, expected %v, got %v", restic.ErrRemovedLock, err)

View File

@ -3,7 +3,6 @@ package restic
import (
"context"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
@ -55,7 +54,7 @@ func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, f
// ParallelRemove deletes the given fileList of fileType in parallel
// if callback returns an error, then it will abort.
func ParallelRemove(ctx context.Context, repo Repository, fileList IDSet, fileType FileType, report func(id ID, err error) error, bar *progress.Counter) error {
func ParallelRemove(ctx context.Context, repo RemoverUnpacked, fileList IDSet, fileType FileType, report func(id ID, err error) error, bar *progress.Counter) error {
fileChan := make(chan ID)
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
@ -77,8 +76,7 @@ func ParallelRemove(ctx context.Context, repo Repository, fileList IDSet, fileTy
for i := 0; i < int(workerCount); i++ {
wg.Go(func() error {
for id := range fileChan {
h := backend.Handle{Type: fileType, Name: id.String()}
err := repo.Backend().Remove(ctx, h)
err := repo.RemoveUnpacked(ctx, fileType, id)
if report != nil {
err = report(id, err)
}

View File

@ -16,9 +16,6 @@ var ErrInvalidData = errors.New("invalid data returned")
// Repository stores data in a backend. It provides high-level functions and
// transparently encrypts/decrypts data.
type Repository interface {
// Backend returns the backend used by the repository
Backend() backend.Backend
// Connections returns the maximum number of concurrent backend operations
Connections() uint
@ -57,6 +54,8 @@ type Repository interface {
// LoadUnpacked loads and decrypts the file with the given type and ID.
LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error)
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
// RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots.
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
// LoadRaw reads all data stored in the backend for the file with id and filetype t.
// If the backend returns data that does not match the id, then the buffer is returned
@ -90,6 +89,18 @@ type SaverUnpacked interface {
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
}
// RemoverUnpacked allows removing an unpacked blob
type RemoverUnpacked interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
}
type SaverRemoverUnpacked interface {
SaverUnpacked
RemoverUnpacked
}
type PackBlobs struct {
PackID ID
Blobs []Blob
@ -112,7 +123,7 @@ type MasterIndex interface {
Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error
Save(ctx context.Context, repo SaverRemoverUnpacked, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error
}
// Lister allows listing files in a backend.
@ -124,3 +135,9 @@ type ListerLoaderUnpacked interface {
Lister
LoaderUnpacked
}
type Unpacked interface {
ListerLoaderUnpacked
SaverUnpacked
RemoverUnpacked
}

View File

@ -32,7 +32,7 @@ func TestLoadJSONUnpacked(t *testing.T) {
}
func testLoadJSONUnpacked(t *testing.T, version uint) {
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
// archive a snapshot
sn := restic.Snapshot{}

View File

@ -190,7 +190,7 @@ func ParseDurationOrPanic(s string) Duration {
// TestLoadAllSnapshots returns a list of all snapshots in the repo.
// If a snapshot ID is in excludeIDs, it will not be included in the result.
func TestLoadAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet) (snapshots Snapshots, err error) {
func TestLoadAllSnapshots(ctx context.Context, repo ListerLoaderUnpacked, excludeIDs IDSet) (snapshots Snapshots, err error) {
err = ForAllSnapshots(ctx, repo, repo, excludeIDs, func(id ID, sn *Snapshot, err error) error {
if err != nil {
return err

View File

@ -181,7 +181,7 @@ func testLoadTree(t *testing.T, version uint) {
}
// archive a few files
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush(context.Background()))
@ -199,7 +199,7 @@ func benchmarkLoadTree(t *testing.B, version uint) {
}
// archive a few files
repo := repository.TestRepositoryWithVersion(t, version)
repo, _ := repository.TestRepositoryWithVersion(t, version)
sn := archiver.TestSnapshot(t, repo, rtest.BenchArchiveDirectory, nil)
rtest.OK(t, repo.Flush(context.Background()))