Revision 19ad634673c51cb77b68d320c82b4b4adf690bfd authored by kayrus on 31 July 2015, 11:25:22 UTC, committed by kayrus on 09 September 2015, 18:07:31 UTC
1 parent af0474f
Raw File
kvstore_test.go
package storage

import (
	"crypto/rand"
	"encoding/binary"
	"errors"
	"io"
	"math"
	"os"
	"reflect"
	"testing"
	"time"

	"github.com/coreos/etcd/pkg/testutil"
	"github.com/coreos/etcd/storage/backend"
	"github.com/coreos/etcd/storage/storagepb"
)

func TestStorePut(t *testing.T) {
	tests := []struct {
		rev revision
		r   indexGetResp

		wrev    revision
		wev     storagepb.Event
		wputrev revision
	}{
		{
			revision{1, 0},
			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
			revision{1, 1},
			storagepb.Event{
				Type: storagepb.PUT,
				Kv: &storagepb.KeyValue{
					Key:            []byte("foo"),
					Value:          []byte("bar"),
					CreateRevision: 2,
					ModRevision:    2,
					Version:        1,
				},
			},
			revision{2, 0},
		},
		{
			revision{1, 1},
			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
			revision{1, 2},
			storagepb.Event{
				Type: storagepb.PUT,
				Kv: &storagepb.KeyValue{
					Key:            []byte("foo"),
					Value:          []byte("bar"),
					CreateRevision: 2,
					ModRevision:    2,
					Version:        2,
				},
			},
			revision{2, 1},
		},
		{
			revision{2, 0},
			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
			revision{2, 1},
			storagepb.Event{
				Type: storagepb.PUT,
				Kv: &storagepb.KeyValue{
					Key:            []byte("foo"),
					Value:          []byte("bar"),
					CreateRevision: 2,
					ModRevision:    3,
					Version:        3,
				},
			},
			revision{3, 0},
		},
	}
	for i, tt := range tests {
		s, b, index := newFakeStore()
		s.currentRev = tt.rev
		index.indexGetRespc <- tt.r

		s.put([]byte("foo"), []byte("bar"))

		data, err := tt.wev.Marshal()
		if err != nil {
			t.Errorf("#%d: marshal err = %v, want nil", i, err)
		}
		wact := []testutil.Action{
			{"put", []interface{}{keyBucketName, newTestBytes(tt.wputrev), data}},
		}
		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
		}
		wact = []testutil.Action{
			{"get", []interface{}{[]byte("foo"), tt.wputrev.main}},
			{"put", []interface{}{[]byte("foo"), tt.wputrev}},
		}
		if g := index.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
		}
		if s.currentRev != tt.wrev {
			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
		}
	}
}

func TestStoreRange(t *testing.T) {
	ev := storagepb.Event{
		Type: storagepb.PUT,
		Kv: &storagepb.KeyValue{
			Key:            []byte("foo"),
			Value:          []byte("bar"),
			CreateRevision: 1,
			ModRevision:    2,
			Version:        1,
		},
	}
	evb, err := ev.Marshal()
	if err != nil {
		t.Fatal(err)
	}
	currev := revision{1, 1}
	wrev := int64(2)

	tests := []struct {
		idxr indexRangeResp
		r    rangeResp
	}{
		{
			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
		},
		{
			indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
			rangeResp{[][]byte{newTestBytes(revision{2, 0})}, [][]byte{evb}},
		},
	}
	for i, tt := range tests {
		s, b, index := newFakeStore()
		s.currentRev = currev
		b.tx.rangeRespc <- tt.r
		index.indexRangeRespc <- tt.idxr

		kvs, rev, err := s.rangeKeys([]byte("foo"), []byte("goo"), 1, 0)
		if err != nil {
			t.Errorf("#%d: err = %v, want nil", i, err)
		}
		if w := []storagepb.KeyValue{*ev.Kv}; !reflect.DeepEqual(kvs, w) {
			t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
		}
		if rev != wrev {
			t.Errorf("#%d: rev = %d, want %d", i, rev, wrev)
		}

		wact := []testutil.Action{
			{"range", []interface{}{keyBucketName, newTestBytes(tt.idxr.revs[0]), []byte(nil), int64(0)}},
		}
		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
		}
		wact = []testutil.Action{
			{"range", []interface{}{[]byte("foo"), []byte("goo"), wrev}},
		}
		if g := index.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
		}
		if s.currentRev != currev {
			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
		}
	}
}

func TestStoreDeleteRange(t *testing.T) {
	tests := []struct {
		rev revision
		r   indexRangeResp

		wrev    revision
		wrrev   int64
		wdelrev revision
	}{
		{
			revision{2, 0},
			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
			revision{2, 1},
			2,
			revision{3, 0},
		},
		{
			revision{2, 1},
			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
			revision{2, 2},
			3,
			revision{3, 1},
		},
	}
	for i, tt := range tests {
		s, b, index := newFakeStore()
		s.currentRev = tt.rev
		index.indexRangeRespc <- tt.r

		n := s.deleteRange([]byte("foo"), []byte("goo"))
		if n != 1 {
			t.Errorf("#%d: n = %d, want 1", i, n)
		}

		data, err := (&storagepb.Event{
			Type: storagepb.DELETE,
			Kv: &storagepb.KeyValue{
				Key: []byte("foo"),
			},
		}).Marshal()
		if err != nil {
			t.Errorf("#%d: marshal err = %v, want nil", i, err)
		}
		wact := []testutil.Action{
			{"put", []interface{}{keyBucketName, newTestBytes(tt.wdelrev), data}},
		}
		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
		}
		wact = []testutil.Action{
			{"range", []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
			{"tombstone", []interface{}{[]byte("foo"), tt.wdelrev}},
		}
		if g := index.Action(); !reflect.DeepEqual(g, wact) {
			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
		}
		if s.currentRev != tt.wrev {
			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
		}
	}
}

func TestStoreCompact(t *testing.T) {
	s, b, index := newFakeStore()
	s.currentRev = revision{3, 0}
	index.indexCompactRespc <- map[revision]struct{}{revision{1, 0}: {}}
	b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{1, 0}), newTestBytes(revision{2, 0})}, nil}

	s.Compact(3)
	s.wg.Wait()

	if s.compactMainRev != 3 {
		t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
	}
	end := make([]byte, 8)
	binary.BigEndian.PutUint64(end, uint64(4))
	wact := []testutil.Action{
		{"put", []interface{}{metaBucketName, scheduledCompactKeyName, newTestBytes(revision{3, 0})}},
		{"range", []interface{}{keyBucketName, make([]byte, 17), end, int64(10000)}},
		{"delete", []interface{}{keyBucketName, newTestBytes(revision{2, 0})}},
		{"put", []interface{}{metaBucketName, finishedCompactKeyName, newTestBytes(revision{3, 0})}},
	}
	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
		t.Errorf("tx actions = %+v, want %+v", g, wact)
	}
	wact = []testutil.Action{
		{"compact", []interface{}{int64(3)}},
	}
	if g := index.Action(); !reflect.DeepEqual(g, wact) {
		t.Errorf("index action = %+v, want %+v", g, wact)
	}
}

func TestStoreRestore(t *testing.T) {
	s, b, index := newFakeStore()

	putev := storagepb.Event{
		Type: storagepb.PUT,
		Kv: &storagepb.KeyValue{
			Key:            []byte("foo"),
			Value:          []byte("bar"),
			CreateRevision: 3,
			ModRevision:    3,
			Version:        1,
		},
	}
	putevb, err := putev.Marshal()
	if err != nil {
		t.Fatal(err)
	}
	delev := storagepb.Event{
		Type: storagepb.DELETE,
		Kv: &storagepb.KeyValue{
			Key: []byte("foo"),
		},
	}
	delevb, err := delev.Marshal()
	if err != nil {
		t.Fatal(err)
	}
	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}
	b.tx.rangeRespc <- rangeResp{[][]byte{newTestBytes(revision{3, 0}), newTestBytes(revision{4, 0})}, [][]byte{putevb, delevb}}
	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestBytes(revision{2, 0})}}

	s.Restore()

	if s.compactMainRev != 2 {
		t.Errorf("compact rev = %d, want 4", s.compactMainRev)
	}
	wrev := revision{4, 0}
	if !reflect.DeepEqual(s.currentRev, wrev) {
		t.Errorf("current rev = %v, want %v", s.currentRev, wrev)
	}
	wact := []testutil.Action{
		{"range", []interface{}{metaBucketName, finishedCompactKeyName, []byte(nil), int64(0)}},
		{"range", []interface{}{keyBucketName, newTestBytes(revision{}), newTestBytes(revision{math.MaxInt64, math.MaxInt64}), int64(0)}},
		{"range", []interface{}{metaBucketName, scheduledCompactKeyName, []byte(nil), int64(0)}},
	}
	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
		t.Errorf("tx actions = %+v, want %+v", g, wact)
	}
	wact = []testutil.Action{
		{"restore", []interface{}{[]byte("foo"), revision{3, 0}, revision{3, 0}, int64(1)}},
		{"tombstone", []interface{}{[]byte("foo"), revision{4, 0}}},
	}
	if g := index.Action(); !reflect.DeepEqual(g, wact) {
		t.Errorf("index action = %+v, want %+v", g, wact)
	}
}

func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
	s0 := newStore(tmpPath)
	defer os.Remove(tmpPath)

	s0.Put([]byte("foo"), []byte("bar"))
	s0.Put([]byte("foo"), []byte("bar1"))
	s0.Put([]byte("foo"), []byte("bar2"))

	// write scheduled compaction, but not do compaction
	rbytes := newRevBytes()
	revToBytes(revision{main: 2}, rbytes)
	tx := s0.b.BatchTx()
	tx.Lock()
	tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
	tx.Unlock()

	s0.Close()

	s1 := newStore(tmpPath)
	s1.Restore()

	// wait for scheduled compaction to be finished
	time.Sleep(100 * time.Millisecond)

	if _, _, err := s1.Range([]byte("foo"), nil, 0, 2); err != ErrCompacted {
		t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
	}
	// check the key in backend is deleted
	revbytes := newRevBytes()
	// TODO: compact should delete main=2 key too
	revToBytes(revision{main: 1}, revbytes)
	tx = s1.b.BatchTx()
	tx.Lock()
	ks, _ := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
	if len(ks) != 0 {
		t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
	}
	tx.Unlock()
}

func BenchmarkStorePut(b *testing.B) {
	s := newStore(tmpPath)
	defer os.Remove(tmpPath)

	// prepare keys
	keys := make([][]byte, b.N)
	for i := 0; i < b.N; i++ {
		keys[i] = make([]byte, 64)
		rand.Read(keys[i])
	}

	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		s.Put(keys[i], []byte("foo"))
	}
}

func newTestBytes(rev revision) []byte {
	bytes := newRevBytes()
	revToBytes(rev, bytes)
	return bytes
}

func newFakeStore() (*store, *fakeBackend, *fakeIndex) {
	b := &fakeBackend{&fakeBatchTx{rangeRespc: make(chan rangeResp, 5)}}
	index := &fakeIndex{
		indexGetRespc:     make(chan indexGetResp, 1),
		indexRangeRespc:   make(chan indexRangeResp, 1),
		indexCompactRespc: make(chan map[revision]struct{}, 1),
	}
	return &store{
		b:              b,
		kvindex:        index,
		currentRev:     revision{},
		compactMainRev: -1,
	}, b, index
}

type rangeResp struct {
	keys [][]byte
	vals [][]byte
}

type fakeBatchTx struct {
	testutil.Recorder
	rangeRespc chan rangeResp
}

func (b *fakeBatchTx) Lock()                          {}
func (b *fakeBatchTx) Unlock()                        {}
func (b *fakeBatchTx) UnsafeCreateBucket(name []byte) {}
func (b *fakeBatchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
	b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucketName, key, value}})
}
func (b *fakeBatchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
	b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucketName, key, endKey, limit}})
	r := <-b.rangeRespc
	return r.keys, r.vals
}
func (b *fakeBatchTx) UnsafeDelete(bucketName []byte, key []byte) {
	b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucketName, key}})
}
func (b *fakeBatchTx) Commit()        {}
func (b *fakeBatchTx) CommitAndStop() {}

type fakeBackend struct {
	tx *fakeBatchTx
}

func (b *fakeBackend) BatchTx() backend.BatchTx                  { return b.tx }
func (b *fakeBackend) Snapshot(w io.Writer) (n int64, err error) { return 0, errors.New("unsupported") }
func (b *fakeBackend) ForceCommit()                              {}
func (b *fakeBackend) Close() error                              { return nil }

type indexGetResp struct {
	rev     revision
	created revision
	ver     int64
	err     error
}

type indexRangeResp struct {
	keys [][]byte
	revs []revision
}

type fakeIndex struct {
	testutil.Recorder
	indexGetRespc     chan indexGetResp
	indexRangeRespc   chan indexRangeResp
	indexCompactRespc chan map[revision]struct{}
}

func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
	i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
	r := <-i.indexGetRespc
	return r.rev, r.created, r.ver, r.err
}
func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
	i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
	r := <-i.indexRangeRespc
	return r.keys, r.revs
}
func (i *fakeIndex) Put(key []byte, rev revision) {
	i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
}
func (i *fakeIndex) Restore(key []byte, created, modified revision, ver int64) {
	i.Recorder.Record(testutil.Action{Name: "restore", Params: []interface{}{key, created, modified, ver}})
}
func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
	i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
	return nil
}
func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
	return <-i.indexCompactRespc
}
func (i *fakeIndex) Equal(b index) bool { return false }
back to top