From d05f6211d151783cb2f1a26d05e5965ad78a89db Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 23 Apr 2023 12:16:54 +0200 Subject: [PATCH] lock: Do not limit backend concurrency for lock files restic must be able to refresh lock files in time. However, large uploads over slow connections can cause the lock refresh to be stuck behind the large uploads and thus time out. --- changelog/unreleased/pull-4304 | 5 ++++ internal/backend/sema/backend.go | 23 ++++++++++++------- internal/backend/sema/backend_test.go | 33 +++++++++++++++++++++------ 3 files changed, 46 insertions(+), 15 deletions(-) create mode 100644 changelog/unreleased/pull-4304 diff --git a/changelog/unreleased/pull-4304 b/changelog/unreleased/pull-4304 new file mode 100644 index 000000000..ca3c7a8db --- /dev/null +++ b/changelog/unreleased/pull-4304 @@ -0,0 +1,5 @@ +Bugfix: Avoid lock refresh issues with slow network connections + +On network connections with a low upload speed, restic could often fail backups and other operations with `Fatal: failed to refresh lock in time`. We've reworked the lock refresh to avoid this error. + +https://github.com/restic/restic/pull/4304 diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index fc4a9dde5..dd4859ed1 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -31,14 +31,24 @@ func NewBackend(be restic.Backend) restic.Backend { } } +// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function +// must be called to release the token. +func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() { + // allow concurrent lock file operations to ensure that the lock refresh is always possible + if t == restic.LockFile { + return func() {} + } + be.sem.GetToken() + return be.sem.ReleaseToken +} + // Save adds new Data to the backend. func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { if err := h.Valid(); err != nil { return backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Save(ctx, h, rd) } @@ -56,8 +66,7 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l return backoff.Permanent(errors.Errorf("invalid length %d", length)) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Load(ctx, h, length, offset, fn) } @@ -68,8 +77,7 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) ( return restic.FileInfo{}, backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Stat(ctx, h) } @@ -80,8 +88,7 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) return backoff.Permanent(err) } - be.sem.GetToken() - defer be.sem.ReleaseToken() + defer be.typeDependentLimit(h.Type)() return be.Backend.Remove(ctx, h) } diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go index db9559840..dc599b7f8 100644 --- a/internal/backend/sema/backend_test.go +++ b/internal/backend/sema/backend_test.go @@ -88,7 +88,7 @@ func countingBlocker() (func(), func(int) int) { unblock := func(expected int) int { // give goroutines enough time to block var blocked int64 - for i := 0; i < 100 && blocked != int64(expected); i++ { + for i := 0; i < 100 && blocked < int64(expected); i++ { time.Sleep(100 * time.Microsecond) blocked = atomic.LoadInt64(&ctr) } @@ -99,8 +99,9 @@ func countingBlocker() (func(), func(int) int) { return wait, unblock } -func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) { +func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int, isUnlimited bool) { expectBlocked := int(2) + workerCount := expectBlocked + 1 m := mock.NewBackend() setup(m) @@ -108,10 +109,13 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b be := sema.NewBackend(m) var wg errgroup.Group - for i := 0; i < int(expectBlocked+1); i++ { + for i := 0; i < workerCount; i++ { wg.Go(handler(be)) } + if isUnlimited { + expectBlocked = workerCount + } blocked := unblock(expectBlocked) test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked) test.OK(t, wg.Wait()) @@ -129,7 +133,7 @@ func TestConcurrencyLimitSave(t *testing.T) { h := restic.Handle{Type: restic.PackFile, Name: "foobar"} return be.Save(context.TODO(), h, nil) } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitLoad(t *testing.T) { @@ -145,7 +149,7 @@ func TestConcurrencyLimitLoad(t *testing.T) { nilCb := func(rd io.Reader) error { return nil } return be.Load(context.TODO(), h, 10, 0, nilCb) } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitStat(t *testing.T) { @@ -161,7 +165,7 @@ func TestConcurrencyLimitStat(t *testing.T) { _, err := be.Stat(context.TODO(), h) return err } - }, unblock) + }, unblock, false) } func TestConcurrencyLimitDelete(t *testing.T) { @@ -176,5 +180,20 @@ func TestConcurrencyLimitDelete(t *testing.T) { h := restic.Handle{Type: restic.PackFile, Name: "foobar"} return be.Remove(context.TODO(), h) } - }, unblock) + }, unblock, false) +} + +func TestConcurrencyUnlimitedLockSave(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.LockFile, Name: "foobar"} + return be.Save(context.TODO(), h, nil) + } + }, unblock, true) }