diff --git a/internal/cache/backend.go b/internal/cache/backend.go index eeb67e1c0..89aae51dc 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -3,6 +3,7 @@ package cache import ( "context" "io" + "sync" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -12,11 +13,25 @@ import ( type Backend struct { restic.Backend *Cache + + // inProgress contains the handle for all files that are currently + // downloaded. The channel in the value is closed as soon as the download + // is finished. + inProgressMutex sync.Mutex + inProgress map[restic.Handle]chan struct{} } // ensure cachedBackend implements restic.Backend var _ restic.Backend = &Backend{} +func newBackend(be restic.Backend, c *Cache) *Backend { + return &Backend{ + Backend: be, + Cache: c, + inProgress: make(map[restic.Handle]chan struct{}), + } +} + // Remove deletes a file from the backend and the cache if it has been cached. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { debug.Log("cache Remove(%v)", h) @@ -83,6 +98,30 @@ var autoCacheFiles = map[restic.FileType]bool{ } func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { + finish := make(chan struct{}) + defer func() { + close(finish) + + // remove the finish channel from the map + b.inProgressMutex.Lock() + delete(b.inProgress, h) + b.inProgressMutex.Unlock() + }() + + b.inProgressMutex.Lock() + other, alreadyDownloading := b.inProgress[h] + if !alreadyDownloading { + b.inProgress[h] = finish + } + b.inProgressMutex.Unlock() + + if alreadyDownloading { + debug.Log("readahead %v is already performed by somebody else, delegating...", h) + <-other + debug.Log("download %v finished", h) + return nil + } + rd, err := b.Backend.Load(ctx, h, 0, 0) if err != nil { return err @@ -101,8 +140,29 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { return nil } +// loadFromCacheOrDelegate will try to load the file from the cache, and fall +// back to the backend if that fails. +func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + rd, err := b.Cache.Load(h, length, offset) + if err == nil { + return rd, nil + } + + return b.Backend.Load(ctx, h, length, offset) +} + // Load loads a file from the cache or the backend. func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + b.inProgressMutex.Lock() + waitForFinish, inProgress := b.inProgress[h] + b.inProgressMutex.Unlock() + + if inProgress { + debug.Log("downloading %v is already in progress, waiting for finish", h) + <-waitForFinish + debug.Log("downloading %v finished", h) + } + if b.Cache.Has(h) { debug.Log("Load(%v, %v, %v) from cache", h, length, offset) rd, err := b.Cache.Load(h, length, offset) @@ -116,9 +176,10 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset if offset != 0 || length != 0 { if b.Cache.PerformReadahead(h) { debug.Log("performing readahead for %v", h) + err := b.cacheFile(ctx, h) if err == nil { - return b.Cache.Load(h, length, offset) + return b.loadFromCacheOrDelegate(ctx, h, length, offset) } debug.Log("error caching %v: %v", h, err) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 6fb3de1f7..616345ca2 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -155,10 +155,7 @@ func (c *Cache) IsNotExist(err error) bool { // Wrap returns a backend with a cache. func (c *Cache) Wrap(be restic.Backend) restic.Backend { - return &Backend{ - Backend: be, - Cache: c, - } + return newBackend(be, c) } // BaseDir returns the base directory.