https://github.com/etcd-io/etcd
Revision 433f2ee1bc690ce2b1880bfa497b28b7362f13db authored by Xiang Li on 29 June 2015, 20:44:39 UTC, committed by Xiang Li on 29 June 2015, 20:44:43 UTC
Add a restore func to correctly restore create reversion and
version of keys for the index.
1 parent ccca2b0
Raw File
Tip revision: 433f2ee1bc690ce2b1880bfa497b28b7362f13db authored by Xiang Li on 29 June 2015, 20:44:39 UTC
storage: correctly restore create and ver
Tip revision: 433f2ee
index.go
package storage

import (
	"log"
	"sync"

	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/google/btree"
)

type index interface {
	Get(key []byte, atRev int64) (rev, created reversion, ver int64, err error)
	Range(key, end []byte, atRev int64) ([][]byte, []reversion)
	Put(key []byte, rev reversion)
	Restore(key []byte, created, modified reversion, ver int64)
	Tombstone(key []byte, rev reversion) error
	Compact(rev int64) map[reversion]struct{}
	Equal(b index) bool
}

type treeIndex struct {
	sync.RWMutex
	tree *btree.BTree
}

func newTreeIndex() index {
	return &treeIndex{
		tree: btree.New(32),
	}
}

func (ti *treeIndex) Put(key []byte, rev reversion) {
	keyi := &keyIndex{key: key}

	ti.Lock()
	defer ti.Unlock()
	item := ti.tree.Get(keyi)
	if item == nil {
		keyi.put(rev.main, rev.sub)
		ti.tree.ReplaceOrInsert(keyi)
		return
	}
	okeyi := item.(*keyIndex)
	okeyi.put(rev.main, rev.sub)
}

func (ti *treeIndex) Restore(key []byte, created, modified reversion, ver int64) {
	keyi := &keyIndex{key: key}

	ti.Lock()
	defer ti.Unlock()
	item := ti.tree.Get(keyi)
	if item == nil {
		keyi.restore(created, modified, ver)
		ti.tree.ReplaceOrInsert(keyi)
		return
	}
	okeyi := item.(*keyIndex)
	okeyi.put(modified.main, modified.sub)
}

func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created reversion, ver int64, err error) {
	keyi := &keyIndex{key: key}

	ti.RLock()
	defer ti.RUnlock()
	item := ti.tree.Get(keyi)
	if item == nil {
		return reversion{}, reversion{}, 0, ErrReversionNotFound
	}

	keyi = item.(*keyIndex)
	return keyi.get(atRev)
}

func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []reversion) {
	if end == nil {
		rev, _, _, err := ti.Get(key, atRev)
		if err != nil {
			return nil, nil
		}
		return [][]byte{key}, []reversion{rev}
	}

	keyi := &keyIndex{key: key}
	endi := &keyIndex{key: end}

	ti.RLock()
	defer ti.RUnlock()

	ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
		if !item.Less(endi) {
			return false
		}
		curKeyi := item.(*keyIndex)
		rev, _, _, err := curKeyi.get(atRev)
		if err != nil {
			return true
		}
		revs = append(revs, rev)
		keys = append(keys, curKeyi.key)
		return true
	})

	return keys, revs
}

func (ti *treeIndex) Tombstone(key []byte, rev reversion) error {
	keyi := &keyIndex{key: key}

	ti.Lock()
	defer ti.Unlock()
	item := ti.tree.Get(keyi)
	if item == nil {
		return ErrReversionNotFound
	}

	ki := item.(*keyIndex)
	ki.tombstone(rev.main, rev.sub)
	return nil
}

func (ti *treeIndex) Compact(rev int64) map[reversion]struct{} {
	available := make(map[reversion]struct{})
	emptyki := make([]*keyIndex, 0)
	log.Printf("store.index: compact %d", rev)
	// TODO: do not hold the lock for long time?
	// This is probably OK. Compacting 10M keys takes O(10ms).
	ti.Lock()
	defer ti.Unlock()
	ti.tree.Ascend(compactIndex(rev, available, &emptyki))
	for _, ki := range emptyki {
		item := ti.tree.Delete(ki)
		if item == nil {
			log.Panic("store.index: unexpected delete failure during compaction")
		}
	}
	return available
}

func compactIndex(rev int64, available map[reversion]struct{}, emptyki *[]*keyIndex) func(i btree.Item) bool {
	return func(i btree.Item) bool {
		keyi := i.(*keyIndex)
		keyi.compact(rev, available)
		if keyi.isEmpty() {
			*emptyki = append(*emptyki, keyi)
		}
		return true
	}
}

func (a *treeIndex) Equal(bi index) bool {
	b := bi.(*treeIndex)

	if a.tree.Len() != b.tree.Len() {
		return false
	}

	equal := true

	a.tree.Ascend(func(item btree.Item) bool {
		aki := item.(*keyIndex)
		bki := b.tree.Get(item).(*keyIndex)
		if !aki.equal(bki) {
			equal = false
			return false
		}
		return true
	})

	return equal
}
back to top