restic/internal/repository/repack.go
Michael Eischer 120ccc8754 repository: Rework blob saving to use an async pack uploader
Previously, SaveAndEncrypt would assemble blobs into packs and either
return immediately if the pack is not yet full or upload the pack file
otherwise. The upload will block the current goroutine until it
finishes.

Now, the upload is done using separate goroutines. This requires changes
to the error handling. As uploads are no longer tied to a SaveAndEncrypt
call, failed uploads are signaled using an errgroup.

To count the uploaded amount of data, the pack header overhead is no
longer returned by `packer.Finalize` but rather by
`packer.HeaderOverhead`. This helper method is necessary to continue
returning the pack header overhead directly to the responsible call to
`repository.SaveBlob`. Without the method this would not be possible,
as packs are finalized asynchronously.
2022-07-02 22:42:34 +02:00

128 lines
3.3 KiB
Go

package repository
import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
const numRepackWorkers = 8
// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
// be removed.
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
if repo == dstRepo && dstRepo.Backend().Connections() < 2 {
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
}
wg, wgCtx := errgroup.WithContext(ctx)
dstRepo.StartPackUploader(wgCtx, wg)
wg.Go(func() error {
var err error
obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p)
return err
})
if err := wg.Wait(); err != nil {
return nil, err
}
return obsoletePacks, nil
}
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
wg, wgCtx := errgroup.WithContext(ctx)
var keepMutex sync.Mutex
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
select {
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
worker := func() error {
for t := range downloadQueue {
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
return err
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep := keepBlobs.Has(blob)
if shouldKeep {
keepBlobs.Delete(blob)
}
keepMutex.Unlock()
if !shouldKeep {
return nil
}
// We do want to save already saved blobs!
_, _, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
}
p.Add(1)
}
return nil
}
connectionLimit := dstRepo.Backend().Connections() - 1
if connectionLimit > numRepackWorkers {
connectionLimit = numRepackWorkers
}
for i := 0; i < int(connectionLimit); i++ {
wg.Go(worker)
}
if err := wg.Wait(); err != nil {
return nil, err
}
if err := dstRepo.Flush(ctx); err != nil {
return nil, err
}
return packs, nil
}