restic/repository/master_index.go
Alexander Neumann f53008d916 Allow saving duplicate blobs in the repacker
This adds code to the master index to allow saving duplicate blobs
within the repacker. In this mode, only the list of currently in flight
blobs is consulted, and not the index. This correct because while
repacking, a unique list of blobs is saved again to the index.
2016-01-17 21:14:55 +01:00

315 lines
7.5 KiB
Go

package repository
import (
"fmt"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
idxMutex sync.RWMutex
inFlight struct {
backend.IDSet
sync.RWMutex
}
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
return &MasterIndex{
inFlight: struct {
backend.IDSet
sync.RWMutex
}{
IDSet: backend.NewIDSet(),
},
}
}
// Lookup queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Lookup(id backend.ID) (blob PackedBlob, err error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
debug.Log("MasterIndex.Lookup", "looking up id %v", id.Str())
for _, idx := range mi.idx {
blob, err = idx.Lookup(id)
if err == nil {
debug.Log("MasterIndex.Lookup",
"found id %v: %v", id.Str(), blob)
return
}
}
debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str())
return PackedBlob{}, fmt.Errorf("id %v not found in any index", id)
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(id backend.ID) (uint, error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
length, err := idx.LookupSize(id)
if err == nil {
return length, nil
}
}
return 0, fmt.Errorf("id %v not found in any index", id)
}
// ListPack returns the list of blobs in a pack. The first matching index is
// returned, or nil if no index contains information about the pack id.
func (mi *MasterIndex) ListPack(id backend.ID) (list []PackedBlob) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
list := idx.ListPack(id)
if len(list) > 0 {
return list
}
}
return nil
}
// Has queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Has(id backend.ID) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if idx.Has(id) {
return true
}
}
return false
}
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t pack.BlobType) (n uint) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
var sum uint
for _, idx := range mi.idx {
sum += idx.Count(t)
}
return sum
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// Remove deletes an index from the MasterIndex.
func (mi *MasterIndex) Remove(index *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
for i, idx := range mi.idx {
if idx == index {
mi.idx = append(mi.idx[:i], mi.idx[i+1:]...)
return
}
}
}
// Current returns an index that is not yet finalized, so that new entries can
// still be added. If all indexes are finalized, a new index is created and
// returned.
func (mi *MasterIndex) Current() *Index {
mi.idxMutex.RLock()
for _, idx := range mi.idx {
if !idx.Final() {
mi.idxMutex.RUnlock()
return idx
}
}
mi.idxMutex.RUnlock()
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
newIdx := NewIndex()
mi.idx = append(mi.idx, newIdx)
return newIdx
}
// AddInFlight add the given ID to the list of in-flight IDs. An error is
// returned when the ID is already in the list. Setting ignoreDuplicates to
// true only checks the in flight list, otherwise the index itself is also
// tested.
func (mi *MasterIndex) AddInFlight(id backend.ID, ignoreDuplicates bool) error {
// The index + inFlight store must be searched for a matching id in one
// atomic operation. This requires locking the inFlight store and the
// index together!
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
if !ignoreDuplicates {
// Note: mi.Has read locks the index again.
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
}
debug.Log("MasterIndex.AddInFlight", "adding %v", id.Str())
if mi.inFlight.Has(id) {
return fmt.Errorf("%v is already in flight", id.Str())
}
if !ignoreDuplicates {
if mi.Has(id) {
return fmt.Errorf("%v is already indexed (fully processed)", id)
}
}
mi.inFlight.Insert(id)
return nil
}
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
// The index + inFlight store must be searched for a matching id in one
// atomic operation. This requires locking the inFlight store and the
// index together!
mi.inFlight.RLock()
defer mi.inFlight.RUnlock()
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
inFlight := mi.inFlight.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
indexed := mi.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed)
return inFlight
}
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str())
mi.inFlight.Delete(id)
}
// NotFinalIndexes returns all indexes that have not yet been saved.
func (mi *MasterIndex) NotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
list = append(list, idx)
}
}
debug.Log("MasterIndex.NotFinalIndexes", "return %d indexes", len(list))
return list
}
// FullIndexes returns all indexes that are full.
func (mi *MasterIndex) FullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("MasterIndex.FullIndexes", "checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
debug.Log("MasterIndex.FullIndexes", "index %p is final", idx)
continue
}
if IndexFull(idx) {
debug.Log("MasterIndex.FullIndexes", "index %p is full", idx)
list = append(list, idx)
} else {
debug.Log("MasterIndex.FullIndexes", "index %p not full", idx)
}
}
debug.Log("MasterIndex.FullIndexes", "return %d indexes", len(list))
return list
}
// All returns all indexes.
func (mi *MasterIndex) All() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
return mi.idx
}
// RebuildIndex combines all known indexes to a new index, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field.
func (mi *MasterIndex) RebuildIndex(packBlacklist backend.IDSet) (*Index, error) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("MasterIndex.RebuildIndex", "start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex()
done := make(chan struct{})
defer close(done)
for i, idx := range mi.idx {
debug.Log("MasterIndex.RebuildIndex", "adding index %d", i)
for pb := range idx.Each(done) {
if packBlacklist.Has(pb.PackID) {
continue
}
newIndex.Store(pb)
}
if !idx.Final() {
debug.Log("MasterIndex.RebuildIndex", "index %d isn't final, don't add to supersedes field", i)
continue
}
id, err := idx.ID()
if err != nil {
debug.Log("MasterIndex.RebuildIndex", "index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("MasterIndex.RebuildIndex", "adding index id %v to supersedes field", id.Str())
err = newIndex.AddToSupersedes(id)
if err != nil {
return nil, err
}
}
return newIndex, nil
}