package repository import ( "context" "fmt" "math" "sort" "" "" "" "" "" ) var ErrIndexIncomplete = errors.Fatal("index is not complete") var ErrPacksMissing = errors.Fatal("packs from index missing in repo") var ErrSizeNotMatching = errors.Fatal("pack size does not match calculated size from index") // PruneOptions collects all options for the cleanup command. type PruneOptions struct { DryRun bool UnsafeRecovery bool MaxUnusedBytes func(used uint64) (unused uint64) // calculates the number of unused bytes after repacking, according to MaxUnused MaxRepackBytes uint64 RepackCachableOnly bool RepackSmall bool RepackUncompressed bool } type PruneStats struct { Blobs struct { Used uint Duplicate uint Unused uint Remove uint Repack uint Repackrm uint } Size struct { Used uint64 Duplicate uint64 Unused uint64 Remove uint64 Repack uint64 Repackrm uint64 Unref uint64 Uncompressed uint64 } Packs struct { Used uint Unused uint PartlyUsed uint Unref uint Keep uint Repack uint Remove uint } } type PrunePlan struct { removePacksFirst restic.IDSet // packs to remove first (unreferenced packs) repackPacks restic.IDSet // packs to repack keepBlobs restic.CountedBlobSet // blobs to keep during repacking removePacks restic.IDSet // packs to remove ignorePacks restic.IDSet // packs to ignore when rebuilding the index repo restic.Repository stats PruneStats opts PruneOptions } type packInfo struct { usedBlobs uint unusedBlobs uint usedSize uint64 unusedSize uint64 tpe restic.BlobType uncompressed bool } type packInfoWithID struct { ID restic.ID packInfo mustCompress bool } // PlanPrune selects which files to rewrite and which to delete and which blobs to keep. // Also some summary statistics are returned. func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { var stats PruneStats if opts.UnsafeRecovery { // prevent repacking data to make sure users cannot get stuck. opts.MaxRepackBytes = 0 } if repo.Connections() < 2 { return nil, fmt.Errorf("prune requires a backend connection limit of at least two") } if repo.Config().Version < 2 && opts.RepackUncompressed { return nil, fmt.Errorf("compression requires at least repository format version 2") } usedBlobs, err := getUsedBlobs(ctx, repo) if err != nil { return nil, err } printer.P("searching used packs...\n") keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats, printer) if err != nil { return nil, err } printer.P("collecting packs for deletion and repacking\n") plan, err := decidePackAction(ctx, opts, repo, indexPack, &stats, printer) if err != nil { return nil, err } if len(plan.repackPacks) != 0 { blobCount := keepBlobs.Len() // when repacking, we do not want to keep blobs which are // already contained in kept packs, so delete them from keepBlobs repo.Index().Each(ctx, func(blob restic.PackedBlob) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { return } keepBlobs.Delete(blob.BlobHandle) }) if ctx.Err() != nil { return nil, ctx.Err() } if keepBlobs.Len() < blobCount/2 { // replace with copy to shrink map to necessary size if there's a chance to benefit keepBlobs = keepBlobs.Copy() } } else { // keepBlobs is only needed if packs are repacked keepBlobs = nil } plan.keepBlobs = keepBlobs plan.repo = repo plan.stats = stats plan.opts = opts return &plan, nil } func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { // iterate over all blobs in index to find out which blobs are duplicates // The counter in usedBlobs describes how many instances of the blob exist in the repository index // Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist idx.Each(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] if ok { if count < math.MaxUint8 { // don't overflow, but saturate count at 255 // this can lead to a non-optimal pack selection, but won't cause // problems otherwise count++ } usedBlobs[bh] = count } }) if ctx.Err() != nil { return nil, nil, ctx.Err() } // Check if all used blobs have been found in index missingBlobs := restic.NewBlobSet() for bh, count := range usedBlobs { if count == 0 { // blob does not exist in any pack files missingBlobs.Insert(bh) } } if len(missingBlobs) != 0 { printer.E("%v not found in the index\n\n"+ "Integrity check failed: Data seems to be missing.\n"+ "Will not start prune to prevent (additional) data loss!\n"+ "Please report this error (along with the output of the 'prune' run) at\n"+ "\n", missingBlobs) return nil, nil, ErrIndexIncomplete } indexPack := make(map[restic.ID]packInfo) // save computed pack header size for pid, hdrSize := range pack.Size(ctx, idx, true) { // initialize tpe with NumBlobTypes to indicate it's not set indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)} } hasDuplicates := false // iterate over all blobs in index to generate packInfo idx.Each(ctx, func(blob restic.PackedBlob) { ip := indexPack[blob.PackID] // Set blob type if not yet set if ip.tpe == restic.NumBlobTypes { ip.tpe = blob.Type } // mark mixed packs with "Invalid blob type" if ip.tpe != blob.Type { ip.tpe = restic.InvalidBlob } bh := blob.BlobHandle size := uint64(blob.Length) dupCount := usedBlobs[bh] switch { case dupCount >= 2: hasDuplicates = true // mark as unused for now, we will later on select one copy ip.unusedSize += size ip.unusedBlobs++ // count as duplicate, will later on change one copy to be counted as used stats.Size.Duplicate += size stats.Blobs.Duplicate++ case dupCount == 1: // used blob, not duplicate ip.usedSize += size ip.usedBlobs++ stats.Size.Used += size stats.Blobs.Used++ default: // unused blob ip.unusedSize += size ip.unusedBlobs++ stats.Size.Unused += size stats.Blobs.Unused++ } if !blob.IsCompressed() { ip.uncompressed = true } // update indexPack indexPack[blob.PackID] = ip }) if ctx.Err() != nil { return nil, nil, ctx.Err() } // if duplicate blobs exist, those will be set to either "used" or "unused": // - mark only one occurrence of duplicate blobs as used // - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used" // - if there are no used blobs in a pack, possibly mark duplicates as "unused" if hasDuplicates { // iterate again over all blobs in index (this is pretty cheap, all in-mem) idx.Each(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] // skip non-duplicate, aka. normal blobs // count == 0 is used to mark that this was a duplicate blob with only a single occurrence remaining if !ok || count == 1 { return } ip := indexPack[blob.PackID] size := uint64(blob.Length) switch { case ip.usedBlobs > 0, count == 0: // other used blobs in pack or "last" occurrence -> transition to used ip.usedSize += size ip.usedBlobs++ ip.unusedSize -= size ip.unusedBlobs-- // same for the global statistics stats.Size.Used += size stats.Blobs.Used++ stats.Size.Duplicate -= size stats.Blobs.Duplicate-- // let other occurrences remain marked as unused usedBlobs[bh] = 1 default: // remain unused and decrease counter count-- if count == 1 { // setting count to 1 would lead to forgetting that this blob had duplicates // thus use the special value zero. This will select the last instance of the blob for keeping. count = 0 } usedBlobs[bh] = count } // update indexPack indexPack[blob.PackID] = ip }) } if ctx.Err() != nil { return nil, nil, ctx.Err() } // Sanity check. If no duplicates exist, all blobs have value 1. After handling // duplicates, this also applies to duplicates. for _, count := range usedBlobs { if count != 1 { panic("internal error during blob selection") } } return usedBlobs, indexPack, nil } func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) { removePacksFirst := restic.NewIDSet() removePacks := restic.NewIDSet() repackPacks := restic.NewIDSet() var repackCandidates []packInfoWithID var repackSmallCandidates []packInfoWithID repoVersion := repo.Config().Version // only repack very small files by default targetPackSize := repo.PackSize() / 25 if opts.RepackSmall { // consider files with at least 80% of the target size as large enough targetPackSize = repo.PackSize() / 5 * 4 } // loop over all packs and decide what to do bar := printer.NewCounter("packs processed") bar.SetMax(uint64(len(indexPack))) err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { p, ok := indexPack[id] if !ok { // Pack was not referenced in index and is not used => immediately remove! printer.V("will remove pack %v as it is unused and not indexed\n", id.Str()) removePacksFirst.Insert(id) stats.Size.Unref += uint64(packSize) return nil } if p.unusedSize+p.usedSize != uint64(packSize) && p.usedBlobs != 0 { // Pack size does not fit and pack is needed => error // If the pack is not needed, this is no error, the pack can // and will be simply removed, see below. printer.E("pack %s: calculated size %d does not match real size %d\nRun 'restic repair index'.\n", id.Str(), p.unusedSize+p.usedSize, packSize) return ErrSizeNotMatching } // statistics switch { case p.usedBlobs == 0: stats.Packs.Unused++ case p.unusedBlobs == 0: stats.Packs.Used++ default: stats.Packs.PartlyUsed++ } if p.uncompressed { stats.Size.Uncompressed += p.unusedSize + p.usedSize } mustCompress := false if repoVersion >= 2 { // repo v2: always repack tree blobs if uncompressed // compress data blobs if requested mustCompress = (p.tpe == restic.TreeBlob || opts.RepackUncompressed) && p.uncompressed } // decide what to do switch { case p.usedBlobs == 0: // All blobs in pack are no longer used => remove pack! removePacks.Insert(id) stats.Blobs.Remove += p.unusedBlobs stats.Size.Remove += p.unusedSize case opts.RepackCachableOnly && p.tpe == restic.DataBlob: // if this is a data pack and --repack-cacheable-only is set => keep pack! stats.Packs.Keep++ case p.unusedBlobs == 0 && p.tpe != restic.InvalidBlob && !mustCompress: if packSize >= int64(targetPackSize) { // All blobs in pack are used and not mixed => keep pack! stats.Packs.Keep++ } else { repackSmallCandidates = append(repackSmallCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress}) } default: // all other packs are candidates for repacking repackCandidates = append(repackCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress}) } delete(indexPack, id) bar.Add(1) return nil }) bar.Done() if err != nil { return PrunePlan{}, err } // At this point indexPacks contains only missing packs! // missing packs that are not needed can be ignored ignorePacks := restic.NewIDSet() for id, p := range indexPack { if p.usedBlobs == 0 { ignorePacks.Insert(id) stats.Blobs.Remove += p.unusedBlobs stats.Size.Remove += p.unusedSize delete(indexPack, id) } } if len(indexPack) != 0 { printer.E("The index references %d needed pack files which are missing from the repository:\n", len(indexPack)) for id := range indexPack { printer.E(" %v\n", id) } return PrunePlan{}, ErrPacksMissing } if len(ignorePacks) != 0 { printer.E("Missing but unneeded pack files are referenced in the index, will be repaired\n") for id := range ignorePacks { printer.E("will forget missing pack file %v\n", id) } } if len(repackSmallCandidates) < 10 { // too few small files to be worth the trouble, this also prevents endlessly repacking // if there is just a single pack file below the target size stats.Packs.Keep += uint(len(repackSmallCandidates)) } else { repackCandidates = append(repackCandidates, repackSmallCandidates...) } // Sort repackCandidates such that packs with highest ratio unused/used space are picked first. // This is equivalent to sorting by unused / total space. // Instead of unused[i] / used[i] > unused[j] / used[j] we use // unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64 // Moreover packs containing trees and too small packs are sorted to the beginning sort.Slice(repackCandidates, func(i, j int) bool { pi := repackCandidates[i].packInfo pj := repackCandidates[j].packInfo switch { case pi.tpe != restic.DataBlob && pj.tpe == restic.DataBlob: return true case pj.tpe != restic.DataBlob && pi.tpe == restic.DataBlob: return false case pi.unusedSize+pi.usedSize < uint64(targetPackSize) && pj.unusedSize+pj.usedSize >= uint64(targetPackSize): return true case pj.unusedSize+pj.usedSize < uint64(targetPackSize) && pi.unusedSize+pi.usedSize >= uint64(targetPackSize): return false } return pi.unusedSize*pj.usedSize > pj.unusedSize*pi.usedSize }) repack := func(id restic.ID, p packInfo) { repackPacks.Insert(id) stats.Blobs.Repack += p.unusedBlobs + p.usedBlobs stats.Size.Repack += p.unusedSize + p.usedSize stats.Blobs.Repackrm += p.unusedBlobs stats.Size.Repackrm += p.unusedSize if p.uncompressed { stats.Size.Uncompressed -= p.unusedSize + p.usedSize } } // calculate limit for number of unused bytes in the repo after repacking maxUnusedSizeAfter := opts.MaxUnusedBytes(stats.Size.Used) for _, p := range repackCandidates { reachedUnusedSizeAfter := (stats.Size.Unused-stats.Size.Remove-stats.Size.Repackrm < maxUnusedSizeAfter) reachedRepackSize := stats.Size.Repack+p.unusedSize+p.usedSize >= opts.MaxRepackBytes packIsLargeEnough := p.unusedSize+p.usedSize >= uint64(targetPackSize) switch { case reachedRepackSize: stats.Packs.Keep++ case p.tpe != restic.DataBlob, p.mustCompress: // repacking non-data packs / uncompressed-trees is only limited by repackSize repack(p.ID, p.packInfo) case reachedUnusedSizeAfter && packIsLargeEnough: // for all other packs stop repacking if tolerated unused size is reached. stats.Packs.Keep++ default: repack(p.ID, p.packInfo) } } stats.Packs.Unref = uint(len(removePacksFirst)) stats.Packs.Repack = uint(len(repackPacks)) stats.Packs.Remove = uint(len(removePacks)) if repo.Config().Version < 2 { // compression not supported for repository format version 1 stats.Size.Uncompressed = 0 } return PrunePlan{removePacksFirst: removePacksFirst, removePacks: removePacks, repackPacks: repackPacks, ignorePacks: ignorePacks, }, nil } func (plan *PrunePlan) Stats() PruneStats { return plan.stats } // Execute does the actual pruning: // - remove unreferenced packs first // - repack given pack files while keeping the given blobs // - rebuild the index while ignoring all files that will be deleted // - delete the files // plan.removePacks and plan.ignorePacks are modified in this function. func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) { if plan.opts.DryRun { printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n") if len(plan.removePacksFirst) > 0 { printer.V("Would have removed the following unreferenced packs:\n%v\n\n", plan.removePacksFirst) } printer.V("Would have repacked and removed the following packs:\n%v\n\n", plan.repackPacks) printer.V("Would have removed the following no longer used packs:\n%v\n\n", plan.removePacks) // Always quit here if DryRun was set! return nil } repo := plan.repo // make sure the plan can only be used once plan.repo = nil // unreferenced packs can be safely deleted first if len(plan.removePacksFirst) != 0 { printer.P("deleting unreferenced packs\n") _ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer) } if ctx.Err() != nil { return ctx.Err() } if len(plan.repackPacks) != 0 { printer.P("repacking packs\n") bar := printer.NewCounter("packs repacked") bar.SetMax(uint64(len(plan.repackPacks))) _, err := Repack(ctx, repo, repo, plan.repackPacks, plan.keepBlobs, bar) bar.Done() if err != nil { return errors.Fatal(err.Error()) } // Also remove repacked packs plan.removePacks.Merge(plan.repackPacks) if len(plan.keepBlobs) != 0 { printer.E("%v was not repacked\n\n"+ "Integrity check failed.\n"+ "Please report this error (along with the output of the 'prune' run) at\n"+ "\n", plan.keepBlobs) return errors.Fatal("internal error: blobs were not repacked") } // allow GC of the blob set plan.keepBlobs = nil } if len(plan.ignorePacks) == 0 { plan.ignorePacks = plan.removePacks } else { plan.ignorePacks.Merge(plan.removePacks) } if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") indexFiles := repo.Index().(*index.MasterIndex).IDs() err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } } else if len(plan.ignorePacks) != 0 { err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer) if err != nil { return errors.Fatalf("%s", err) } } if len(plan.removePacks) != 0 { printer.P("removing %d old packs\n", len(plan.removePacks)) _ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer) } if ctx.Err() != nil { return ctx.Err() } if plan.opts.UnsafeRecovery { err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) if err != nil { return errors.Fatalf("%s", err) } } if err != nil { return err } // drop outdated in-memory index repo.ClearIndex() printer.P("done\n") return nil } // deleteFiles deletes the given fileList of fileType in parallel // if ignoreError=true, it will print a warning if there was an error, else it will abort. func deleteFiles(ctx context.Context, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error { bar := printer.NewCounter("files deleted") defer bar.Done() return restic.ParallelRemove(ctx, repo, fileList, fileType, func(id restic.ID, err error) error { if err != nil { printer.E("unable to remove %v/%v from the repository\n", fileType, id) if !ignoreError { return err } } printer.VV("removed %v/%v\n", fileType, id) return nil }, bar) }