diff --git a/changelog/unreleased/pull-4304 b/changelog/unreleased/pull-4304 new file mode 100644 index 000000000..ca3c7a8db --- /dev/null +++ b/changelog/unreleased/pull-4304 @@ -0,0 +1,5 @@ +Bugfix: Avoid lock refresh issues with slow network connections + +On network connections with a low upload speed, restic could often fail backups and other operations with `Fatal: failed to refresh lock in time`. We've reworked the lock refresh to avoid this error. + +https://github.com/restic/restic/pull/4304 diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index fc4a9dde5..dd4859ed1 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -31,14 +31,24 @@ func NewBackend(be restic.Backend) restic.Backend { } } +// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function +// must be called to release the token. +func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() { + // allow concurrent lock file operations to ensure that the lock refresh is always possible + if t == restic.LockFile { + return func() {} + } + be.sem.GetToken() + return be.sem.ReleaseToken +} + // Save adds new Data to the backend. func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { if err := h.Valid(); err != nil { return backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Save(ctx, h, rd) } @@ -56,8 +66,7 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l return backoff.Permanent(errors.Errorf("invalid length %d", length)) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Load(ctx, h, length, offset, fn) } @@ -68,8 +77,7 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) ( return restic.FileInfo{}, backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Stat(ctx, h) } @@ -80,8 +88,7 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) return backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Remove(ctx, h) } diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go index db9559840..dc599b7f8 100644 --- a/internal/backend/sema/backend_test.go +++ b/internal/backend/sema/backend_test.go @@ -88,7 +88,7 @@ func countingBlocker() (func(), func(int) int) { unblock := func(expected int) int { // give goroutines enough time to block var blocked int64 - for i := 0; i < 100 && blocked != int64(expected); i++ { + for i := 0; i < 100 && blocked < int64(expected); i++ { time.Sleep(100 * time.Microsecond) blocked = atomic.LoadInt64(&ctr) } @@ -99,8 +99,9 @@ func countingBlocker() (func(), func(int) int) { return wait, unblock } -func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) { +func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int, isUnlimited bool) { expectBlocked := int(2) + workerCount := expectBlocked + 1 m := mock.NewBackend() setup(m) @@ -108,10 +109,13 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b be := sema.NewBackend(m) var wg errgroup.Group - for i := 0; i < int(expectBlocked+1); i++ { + for i := 0; i < workerCount; i++ { wg.Go(handler(be)) } + if isUnlimited { + expectBlocked = workerCount + } blocked := unblock(expectBlocked) test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked) test.OK(t, wg.Wait()) @@ -129,7 +133,7 @@ func TestConcurrencyLimitSave(t *testing.T) { h := restic.Handle{Type: restic.PackFile, Name: "foobar"} return be.Save(context.TODO(), h, nil) } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitLoad(t *testing.T) { @@ -145,7 +149,7 @@ func TestConcurrencyLimitLoad(t *testing.T) { nilCb := func(rd io.Reader) error { return nil } return be.Load(context.TODO(), h, 10, 0, nilCb) } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitStat(t *testing.T) { @@ -161,7 +165,7 @@ func TestConcurrencyLimitStat(t *testing.T) { _, err := be.Stat(context.TODO(), h) return err } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitDelete(t *testing.T) { @@ -176,5 +180,20 @@ func TestConcurrencyLimitDelete(t *testing.T) { h := restic.Handle{Type: restic.PackFile, Name: "foobar"} return be.Remove(context.TODO(), h) } - }, unblock) + }, unblock, false) +} + +func TestConcurrencyUnlimitedLockSave(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.LockFile, Name: "foobar"} + return be.Save(context.TODO(), h, nil) + } + }, unblock, true) }