restic/internal/ui/backup/progress.go
Matt Armstrong 0372c7ef04 Improve the ETA displayed during backup
The ETA restic displays was based on a rate computed across the entire
backup operation. Often restic can progress at uneven rates. In the worst
case, restic progresses over most of the backup at a very high rate and
then finds new data to back up. The displayed ETA is then unrealistic and
never adapts.

Restic now estimates the transfer rate based on a sliding window, with the
goal of adapting to observed changes in rate. To avoid wild changes in the
estimate, several heuristics are used to keep the sliding window wide
enough to be relatively stable.
2023-06-08 20:05:56 +02:00

220 lines
5.1 KiB
Go

package backup
import (
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
)
// A ProgressPrinter can print various progress messages.
// It must be safe to call its methods from concurrent goroutines.
type ProgressPrinter interface {
Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64)
Error(item string, err error) error
ScannerError(item string, err error) error
CompleteItem(messageType string, item string, s archiver.ItemStats, d time.Duration)
ReportTotal(start time.Time, s archiver.ScanStats)
Finish(snapshotID restic.ID, start time.Time, summary *Summary, dryRun bool)
Reset()
P(msg string, args ...interface{})
V(msg string, args ...interface{})
}
type Counter struct {
Files, Dirs, Bytes uint64
}
type Summary struct {
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
// Progress reports progress for the `backup` command.
type Progress struct {
progress.Updater
mu sync.Mutex
start time.Time
estimator rateEstimator
scanStarted, scanFinished bool
currentFiles map[string]struct{}
processed, total Counter
errors uint
summary Summary
printer ProgressPrinter
}
func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress {
p := &Progress{
start: time.Now(),
currentFiles: make(map[string]struct{}),
printer: printer,
estimator: *newRateEstimator(time.Now()),
}
p.Updater = *progress.NewUpdater(interval, func(runtime time.Duration, final bool) {
if final {
p.printer.Reset()
} else {
p.mu.Lock()
defer p.mu.Unlock()
if !p.scanStarted {
return
}
var secondsRemaining uint64
if p.scanFinished {
rate := p.estimator.rate(time.Now())
if rate <= 0 {
secondsRemaining = 0
} else {
todo := float64(p.total.Bytes - p.processed.Bytes)
secondsRemaining = uint64(todo / rate)
}
}
p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining)
}
})
return p
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, err error) error {
p.mu.Lock()
p.errors++
p.scanStarted = true
p.mu.Unlock()
return p.printer.Error(item, err)
}
// StartFile is called when a file is being processed by a worker.
func (p *Progress) StartFile(filename string) {
p.mu.Lock()
defer p.mu.Unlock()
p.currentFiles[filename] = struct{}{}
}
func (p *Progress) addProcessed(c Counter) {
p.processed.Files += c.Files
p.processed.Dirs += c.Dirs
p.processed.Bytes += c.Bytes
p.estimator.recordBytes(time.Now(), c.Bytes)
p.scanStarted = true
}
// CompleteBlob is called for all saved blobs for files.
func (p *Progress) CompleteBlob(bytes uint64) {
p.mu.Lock()
p.addProcessed(Counter{Bytes: bytes})
p.mu.Unlock()
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
p.mu.Lock()
p.summary.ItemStats.Add(s)
// for the last item "/", current is nil
if current != nil {
p.summary.ProcessedBytes += current.Size
}
p.mu.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
p.mu.Lock()
delete(p.currentFiles, item)
p.mu.Unlock()
return
}
switch current.Type {
case "dir":
p.mu.Lock()
p.addProcessed(Counter{Dirs: 1})
p.mu.Unlock()
switch {
case previous == nil:
p.printer.CompleteItem("dir new", item, s, d)
p.mu.Lock()
p.summary.Dirs.New++
p.mu.Unlock()
case previous.Equals(*current):
p.printer.CompleteItem("dir unchanged", item, s, d)
p.mu.Lock()
p.summary.Dirs.Unchanged++
p.mu.Unlock()
default:
p.printer.CompleteItem("dir modified", item, s, d)
p.mu.Lock()
p.summary.Dirs.Changed++
p.mu.Unlock()
}
case "file":
p.mu.Lock()
p.addProcessed(Counter{Files: 1})
delete(p.currentFiles, item)
p.mu.Unlock()
switch {
case previous == nil:
p.printer.CompleteItem("file new", item, s, d)
p.mu.Lock()
p.summary.Files.New++
p.mu.Unlock()
case previous.Equals(*current):
p.printer.CompleteItem("file unchanged", item, s, d)
p.mu.Lock()
p.summary.Files.Unchanged++
p.mu.Unlock()
default:
p.printer.CompleteItem("file modified", item, s, d)
p.mu.Lock()
p.summary.Files.Changed++
p.mu.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (p *Progress) ReportTotal(item string, s archiver.ScanStats) {
p.mu.Lock()
defer p.mu.Unlock()
p.total = Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}
p.scanStarted = true
if item == "" {
p.scanFinished = true
p.printer.ReportTotal(p.start, s)
}
}
// Finish prints the finishing messages.
func (p *Progress) Finish(snapshotID restic.ID, dryrun bool) {
// wait for the status update goroutine to shut down
p.Updater.Done()
p.printer.Finish(snapshotID, p.start, &p.summary, dryrun)
}