This commit is contained in:
Michael Eischer 2024-05-02 10:18:05 -04:00 committed by GitHub
commit 76b656a88b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 124 additions and 46 deletions

View File

@ -416,12 +416,16 @@ func OpenRepository(ctx context.Context, opts GlobalOptions) (*repository.Reposi
}
report := func(msg string, err error, d time.Duration) {
Warnf("%v returned error, retrying after %v: %v\n", msg, d, err)
if d < 0 {
Warnf("%v returned error, retrying after %v: %v\n", msg, d, err)
} else {
Warnf("%v failed: %v\n", msg, err)
}
}
success := func(msg string, retries int) {
Warnf("%v operation successful after %d retries\n", msg, retries)
}
be = retry.New(be, 10, report, success)
be = retry.New(be, 15*time.Minute, report, success)
// wrap backend if a test specified a hook
if opts.backendTestHook != nil {

View File

@ -15,9 +15,9 @@ import (
// backoff.
type Backend struct {
backend.Backend
MaxTries int
Report func(string, error, time.Duration)
Success func(string, int)
MaxElapsedTime time.Duration
Report func(string, error, time.Duration)
Success func(string, int)
}
// statically ensure that RetryBackend implements backend.Backend.
@ -27,32 +27,64 @@ var _ backend.Backend = &Backend{}
// backoff. report is called with a description and the error, if one occurred.
// success is called with the number of retries before a successful operation
// (it is not called if it succeeded on the first try)
func New(be backend.Backend, maxTries int, report func(string, error, time.Duration), success func(string, int)) *Backend {
func New(be backend.Backend, maxElapsedTime time.Duration, report func(string, error, time.Duration), success func(string, int)) *Backend {
return &Backend{
Backend: be,
MaxTries: maxTries,
Report: report,
Success: success,
Backend: be,
MaxElapsedTime: maxElapsedTime,
Report: report,
Success: success,
}
}
// retryNotifyErrorWithSuccess is an extension of backoff.RetryNotify with notification of success after an error.
// success is NOT notified on the first run of operation (only after an error).
func retryNotifyErrorWithSuccess(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, success func(retries int)) error {
var operationWrapper backoff.Operation
if success == nil {
return backoff.RetryNotify(operation, b, notify)
}
retries := 0
operationWrapper := func() error {
err := operation()
if err != nil {
retries++
} else if retries > 0 {
success(retries)
operationWrapper = operation
} else {
retries := 0
operationWrapper = func() error {
err := operation()
if err != nil {
retries++
} else if retries > 0 {
success(retries)
}
return err
}
return err
}
return backoff.RetryNotify(operationWrapper, b, notify)
err := backoff.RetryNotify(operationWrapper, b, notify)
if err != nil && notify != nil {
// log final error
notify(err, -1)
}
return err
}
func withRetryAtLeastOnce(delegate *backoff.ExponentialBackOff) *retryAtLeastOnce {
return &retryAtLeastOnce{delegate: delegate}
}
type retryAtLeastOnce struct {
delegate *backoff.ExponentialBackOff
numTries uint64
}
func (b *retryAtLeastOnce) NextBackOff() time.Duration {
delay := b.delegate.NextBackOff()
b.numTries++
if b.numTries == 1 && b.delegate.Stop == delay {
return b.delegate.InitialInterval
}
return delay
}
func (b *retryAtLeastOnce) Reset() {
b.numTries = 0
b.delegate.Reset()
}
var fastRetries = false
@ -69,13 +101,22 @@ func (be *Backend) retry(ctx context.Context, msg string, f func() error) error
}
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = be.MaxElapsedTime
bo.InitialInterval = 1 * time.Second
bo.Multiplier = 2
if fastRetries {
// speed up integration tests
bo.InitialInterval = 1 * time.Millisecond
maxElapsedTime := 200 * time.Millisecond
if bo.MaxElapsedTime > maxElapsedTime {
bo.MaxElapsedTime = maxElapsedTime
}
}
err := retryNotifyErrorWithSuccess(f,
backoff.WithContext(backoff.WithMaxRetries(bo, uint64(be.MaxTries)), ctx),
backoff.WithContext(withRetryAtLeastOnce(bo), ctx),
func(err error, d time.Duration) {
if be.Report != nil {
be.Report(msg, err, d)

View File

@ -192,8 +192,9 @@ func TestBackendListRetryErrorBackend(t *testing.T) {
}
TestFastRetries(t)
const maxRetries = 2
retryBackend := New(be, maxRetries, nil, nil)
const maxElapsedTime = 10 * time.Millisecond
now := time.Now()
retryBackend := New(be, maxElapsedTime, nil, nil)
var listed []string
err := retryBackend.List(context.TODO(), backend.PackFile, func(fi backend.FileInfo) error {
@ -206,8 +207,9 @@ func TestBackendListRetryErrorBackend(t *testing.T) {
t.Fatalf("wrong error returned, want %v, got %v", ErrBackendTest, err)
}
if retries != maxRetries+1 {
t.Fatalf("List was called %d times, wanted %v", retries, maxRetries+1)
duration := time.Since(now)
if duration > 100*time.Millisecond {
t.Fatalf("list retries took %v, expected at most 10ms", duration)
}
test.Equals(t, names[:2], listed)

View File

@ -132,7 +132,7 @@ func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lock
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(); err != nil {
if err := lock.Unlock(ctx); err != nil {
debug.Log("error while unlocking: %v", err)
logger("error while unlocking: %v", err)
}

View File

@ -18,6 +18,10 @@ import (
"github.com/restic/restic/internal/debug"
)
// UnlockCancelDelay bounds the duration how long lock cleanup operations will wait
// if the passed in context was canceled.
const UnlockCancelDelay time.Duration = 1 * time.Minute
// Lock represents a process locking the repository for an operation.
//
// There are two types of locks: exclusive and non-exclusive. There may be many
@ -137,7 +141,7 @@ func newLock(ctx context.Context, repo Repository, excl bool) (*Lock, error) {
time.Sleep(waitBeforeLockCheck)
if err = lock.checkForOtherLocks(ctx); err != nil {
_ = lock.Unlock()
_ = lock.Unlock(ctx)
return nil, err
}
@ -221,12 +225,15 @@ func (l *Lock) createLock(ctx context.Context) (ID, error) {
}
// Unlock removes the lock from the repository.
func (l *Lock) Unlock() error {
func (l *Lock) Unlock(ctx context.Context) error {
if l == nil || l.lockID == nil {
return nil
}
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: l.lockID.String()})
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
return l.repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: l.lockID.String()})
}
var StaleLockTimeout = 30 * time.Minute
@ -267,6 +274,23 @@ func (l *Lock) Stale() bool {
return false
}
func delayedCancelContext(parentCtx context.Context, delay time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-parentCtx.Done():
case <-ctx.Done():
break
}
time.Sleep(delay)
cancel()
}()
return ctx, cancel
}
// Refresh refreshes the lock by creating a new file in the backend with a new
// timestamp. Afterwards the old lock is removed.
func (l *Lock) Refresh(ctx context.Context) error {
@ -286,7 +310,10 @@ func (l *Lock) Refresh(ctx context.Context) error {
oldLockID := l.lockID
l.lockID = &id
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: oldLockID.String()})
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
return l.repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: oldLockID.String()})
}
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
@ -313,15 +340,19 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
time.Sleep(waitBeforeLockCheck)
exists, err = l.checkExistence(ctx)
ctx, cancel := delayedCancelContext(ctx, UnlockCancelDelay)
defer cancel()
if err != nil {
// cleanup replacement lock
_ = l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: id.String()})
_ = l.repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: id.String()})
return err
}
if !exists {
// cleanup replacement lock
_ = l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: id.String()})
_ = l.repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: id.String()})
return ErrRemovedLock
}
@ -332,7 +363,7 @@ func (l *Lock) RefreshStaleLock(ctx context.Context) error {
oldLockID := l.lockID
l.lockID = &id
return l.repo.Backend().Remove(context.TODO(), backend.Handle{Type: LockFile, Name: oldLockID.String()})
return l.repo.Backend().Remove(ctx, backend.Handle{Type: LockFile, Name: oldLockID.String()})
}
func (l *Lock) checkExistence(ctx context.Context) (bool, error) {

View File

@ -22,7 +22,7 @@ func TestLock(t *testing.T) {
lock, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock())
rtest.OK(t, lock.Unlock(context.TODO()))
}
func TestDoubleUnlock(t *testing.T) {
@ -32,9 +32,9 @@ func TestDoubleUnlock(t *testing.T) {
lock, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err)
rtest.OK(t, lock.Unlock())
rtest.OK(t, lock.Unlock(context.TODO()))
err = lock.Unlock()
err = lock.Unlock(context.TODO())
rtest.Assert(t, err != nil,
"double unlock didn't return an error, got %v", err)
}
@ -49,8 +49,8 @@ func TestMultipleLock(t *testing.T) {
lock2, err := restic.NewLock(context.TODO(), repo)
rtest.OK(t, err)
rtest.OK(t, lock1.Unlock())
rtest.OK(t, lock2.Unlock())
rtest.OK(t, lock1.Unlock(context.TODO()))
rtest.OK(t, lock2.Unlock(context.TODO()))
}
type failLockLoadingBackend struct {
@ -75,7 +75,7 @@ func TestMultipleLockFailure(t *testing.T) {
_, err = restic.NewLock(context.TODO(), repo)
rtest.Assert(t, err != nil, "unreadable lock file did not result in an error")
rtest.OK(t, lock1.Unlock())
rtest.OK(t, lock1.Unlock(context.TODO()))
}
func TestLockExclusive(t *testing.T) {
@ -83,7 +83,7 @@ func TestLockExclusive(t *testing.T) {
elock, err := restic.NewExclusiveLock(context.TODO(), repo)
rtest.OK(t, err)
rtest.OK(t, elock.Unlock())
rtest.OK(t, elock.Unlock(context.TODO()))
}
func TestLockOnExclusiveLockedRepo(t *testing.T) {
@ -99,8 +99,8 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) {
rtest.Assert(t, restic.IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock())
rtest.OK(t, elock.Unlock())
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock(context.TODO()))
}
func TestExclusiveLockOnLockedRepo(t *testing.T) {
@ -116,8 +116,8 @@ func TestExclusiveLockOnLockedRepo(t *testing.T) {
rtest.Assert(t, restic.IsAlreadyLocked(err),
"create normal lock with exclusively locked repo didn't return the correct error")
rtest.OK(t, lock.Unlock())
rtest.OK(t, elock.Unlock())
rtest.OK(t, lock.Unlock(context.TODO()))
rtest.OK(t, elock.Unlock(context.TODO()))
}
func createFakeLock(repo restic.SaverUnpacked, t time.Time, pid int) (restic.ID, error) {
@ -294,7 +294,7 @@ func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
rtest.OK(t, err)
rtest.Assert(t, lock2.Time.After(time0),
"expected a later timestamp after lock refresh")
rtest.OK(t, lock.Unlock())
rtest.OK(t, lock.Unlock(context.TODO()))
}
func TestLockRefresh(t *testing.T) {