From 2e72b57f2f5d111da2f6133a00838abe351c5aac Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 25 Sep 2017 14:35:37 +0200 Subject: [PATCH 1/2] Correct debug message --- internal/restic/snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/restic/snapshot.go b/internal/restic/snapshot.go index f73be6937..47b123240 100644 --- a/internal/restic/snapshot.go +++ b/internal/restic/snapshot.go @@ -163,7 +163,7 @@ func (sn *Snapshot) HasTagList(l []TagList) bool { for _, tags := range l { if sn.HasTags(tags) { - debug.Log(" snapshot satisfies %v", tags, l) + debug.Log(" snapshot satisfies %v %v", tags, l) return true } } From 0bb2a8e0d09a471a1569057a83d18c85f98dd09f Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 25 Sep 2017 15:21:16 +0200 Subject: [PATCH 2/2] cache: Synchronize downloading This commit adds code to synchronize downloading files to the cache. Before, requests that came in for files currently downloading would fail because the file was not completed in the cache. Now, the code waits until the download is completed. Closes #1278 --- internal/cache/backend.go | 63 ++++++++++++++++++++++++++++++++++++++- internal/cache/cache.go | 5 +--- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/internal/cache/backend.go b/internal/cache/backend.go index eeb67e1c0..89aae51dc 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -3,6 +3,7 @@ package cache import ( "context" "io" + "sync" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -12,11 +13,25 @@ import ( type Backend struct { restic.Backend *Cache + + // inProgress contains the handle for all files that are currently + // downloaded. The channel in the value is closed as soon as the download + // is finished. + inProgressMutex sync.Mutex + inProgress map[restic.Handle]chan struct{} } // ensure cachedBackend implements restic.Backend var _ restic.Backend = &Backend{} +func newBackend(be restic.Backend, c *Cache) *Backend { + return &Backend{ + Backend: be, + Cache: c, + inProgress: make(map[restic.Handle]chan struct{}), + } +} + // Remove deletes a file from the backend and the cache if it has been cached. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { debug.Log("cache Remove(%v)", h) @@ -83,6 +98,30 @@ var autoCacheFiles = map[restic.FileType]bool{ } func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { + finish := make(chan struct{}) + defer func() { + close(finish) + + // remove the finish channel from the map + b.inProgressMutex.Lock() + delete(b.inProgress, h) + b.inProgressMutex.Unlock() + }() + + b.inProgressMutex.Lock() + other, alreadyDownloading := b.inProgress[h] + if !alreadyDownloading { + b.inProgress[h] = finish + } + b.inProgressMutex.Unlock() + + if alreadyDownloading { + debug.Log("readahead %v is already performed by somebody else, delegating...", h) + <-other + debug.Log("download %v finished", h) + return nil + } + rd, err := b.Backend.Load(ctx, h, 0, 0) if err != nil { return err @@ -101,8 +140,29 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error { return nil } +// loadFromCacheOrDelegate will try to load the file from the cache, and fall +// back to the backend if that fails. +func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + rd, err := b.Cache.Load(h, length, offset) + if err == nil { + return rd, nil + } + + return b.Backend.Load(ctx, h, length, offset) +} + // Load loads a file from the cache or the backend. func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + b.inProgressMutex.Lock() + waitForFinish, inProgress := b.inProgress[h] + b.inProgressMutex.Unlock() + + if inProgress { + debug.Log("downloading %v is already in progress, waiting for finish", h) + <-waitForFinish + debug.Log("downloading %v finished", h) + } + if b.Cache.Has(h) { debug.Log("Load(%v, %v, %v) from cache", h, length, offset) rd, err := b.Cache.Load(h, length, offset) @@ -116,9 +176,10 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset if offset != 0 || length != 0 { if b.Cache.PerformReadahead(h) { debug.Log("performing readahead for %v", h) + err := b.cacheFile(ctx, h) if err == nil { - return b.Cache.Load(h, length, offset) + return b.loadFromCacheOrDelegate(ctx, h, length, offset) } debug.Log("error caching %v: %v", h, err) diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 6fb3de1f7..616345ca2 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -155,10 +155,7 @@ func (c *Cache) IsNotExist(err error) bool { // Wrap returns a backend with a cache. func (c *Cache) Wrap(be restic.Backend) restic.Backend { - return &Backend{ - Backend: be, - Cache: c, - } + return newBackend(be, c) } // BaseDir returns the base directory.