https://github.com/tendermint/tendermint
Raw File
Tip revision: f468eefed517eeb9b04b936e3b40ba23aae04a3e authored by Thane Thomson on 20 December 2022, 14:00:40 UTC
Merge branch 'main' into wb/add-multiversion-script
Tip revision: f468eef
reactor_test.go
package evidence_test

import (
	"encoding/hex"
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/fortytw2/leaktest"
	"github.com/go-kit/log/term"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/mock"
	"github.com/stretchr/testify/require"

	dbm "github.com/tendermint/tm-db"

	cfg "github.com/tendermint/tendermint/config"
	"github.com/tendermint/tendermint/crypto"
	"github.com/tendermint/tendermint/crypto/tmhash"
	"github.com/tendermint/tendermint/evidence"
	"github.com/tendermint/tendermint/evidence/mocks"
	"github.com/tendermint/tendermint/libs/log"
	"github.com/tendermint/tendermint/p2p"
	p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
	tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
	sm "github.com/tendermint/tendermint/state"
	"github.com/tendermint/tendermint/types"
)

var (
	numEvidence = 10
	timeout     = 120 * time.Second // ridiculously high because CircleCI is slow
)

// We have N evidence reactors connected to one another. The first reactor
// receives a number of evidence at varying heights. We test that all
// other reactors receive the evidence and add it to their own respective
// evidence pools.
func TestReactorBroadcastEvidence(t *testing.T) {
	config := cfg.TestConfig()
	N := 7

	// create statedb for everyone
	stateDBs := make([]sm.Store, N)
	val := types.NewMockPV()
	// we need validators saved for heights at least as high as we have evidence for
	height := int64(numEvidence) + 10
	for i := 0; i < N; i++ {
		stateDBs[i] = initializeValidatorState(val, height)
	}

	// make reactors from statedb
	reactors, pools := makeAndConnectReactorsAndPools(config, stateDBs)

	// set the peer height on each reactor
	for _, r := range reactors {
		for _, peer := range r.Switch.Peers().List() {
			ps := peerState{height}
			peer.Set(types.PeerStateKey, ps)
		}
	}

	// send a bunch of valid evidence to the first reactor's evpool
	// and wait for them all to be received in the others
	evList := sendEvidence(t, pools[0], val, numEvidence)
	waitForEvidence(t, evList, pools)
}

// We have two evidence reactors connected to one another but are at different heights.
// Reactor 1 which is ahead receives a number of evidence. It should only send the evidence
// that is below the height of the peer to that peer.
func TestReactorSelectiveBroadcast(t *testing.T) {
	config := cfg.TestConfig()

	val := types.NewMockPV()
	height1 := int64(numEvidence) + 10
	height2 := int64(numEvidence) / 2

	// DB1 is ahead of DB2
	stateDB1 := initializeValidatorState(val, height1)
	stateDB2 := initializeValidatorState(val, height2)

	// make reactors from statedb
	reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2})

	// set the peer height on each reactor
	for _, r := range reactors {
		for _, peer := range r.Switch.Peers().List() {
			ps := peerState{height1}
			peer.Set(types.PeerStateKey, ps)
		}
	}

	// update the first reactor peer's height to be very small
	peer := reactors[0].Switch.Peers().List()[0]
	ps := peerState{height2}
	peer.Set(types.PeerStateKey, ps)

	// send a bunch of valid evidence to the first reactor's evpool
	evList := sendEvidence(t, pools[0], val, numEvidence)

	// only ones less than the peers height should make it through
	waitForEvidence(t, evList[:numEvidence/2-1], []*evidence.Pool{pools[1]})

	// peers should still be connected
	peers := reactors[1].Switch.Peers().List()
	assert.Equal(t, 1, len(peers))
}

// This tests aims to ensure that reactors don't send evidence that they have committed or that ar
// not ready for the peer through three scenarios.
// First, committed evidence to a newly connected peer
// Second, evidence to a peer that is behind
// Third, evidence that was pending and became committed just before the peer caught up
func TestReactorsGossipNoCommittedEvidence(t *testing.T) {
	config := cfg.TestConfig()

	val := types.NewMockPV()
	var height int64 = 10

	// DB1 is ahead of DB2
	stateDB1 := initializeValidatorState(val, height-1)
	stateDB2 := initializeValidatorState(val, height-2)
	state, err := stateDB1.Load()
	require.NoError(t, err)
	state.LastBlockHeight++

	// make reactors from statedb
	reactors, pools := makeAndConnectReactorsAndPools(config, []sm.Store{stateDB1, stateDB2})

	evList := sendEvidence(t, pools[0], val, 2)
	pools[0].Update(state, evList)
	require.EqualValues(t, uint32(0), pools[0].Size())

	time.Sleep(100 * time.Millisecond)

	peer := reactors[0].Switch.Peers().List()[0]
	ps := peerState{height - 2}
	peer.Set(types.PeerStateKey, ps)

	peer = reactors[1].Switch.Peers().List()[0]
	ps = peerState{height}
	peer.Set(types.PeerStateKey, ps)

	// wait to see that no evidence comes through
	time.Sleep(300 * time.Millisecond)

	// the second pool should not have received any evidence because it has already been committed
	assert.Equal(t, uint32(0), pools[1].Size(), "second reactor should not have received evidence")

	// the first reactor receives three more evidence
	evList = make([]types.Evidence, 3)
	for i := 0; i < 3; i++ {
		ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(height-3+int64(i),
			time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, state.ChainID)
		require.NoError(t, err)
		err = pools[0].AddEvidence(ev)
		require.NoError(t, err)
		evList[i] = ev
	}

	// wait to see that only one evidence is sent
	time.Sleep(300 * time.Millisecond)

	// the second pool should only have received the first evidence because it is behind
	peerEv, _ := pools[1].PendingEvidence(10000)
	assert.EqualValues(t, []types.Evidence{evList[0]}, peerEv)

	// the last evidence is committed and the second reactor catches up in state to the first
	// reactor. We therefore expect that the second reactor only receives one more evidence, the
	// one that is still pending and not the evidence that has already been committed.
	state.LastBlockHeight++
	pools[0].Update(state, []types.Evidence{evList[2]})
	// the first reactor should have the two remaining pending evidence
	require.EqualValues(t, uint32(2), pools[0].Size())

	// now update the state of the second reactor
	pools[1].Update(state, types.EvidenceList{})
	peer = reactors[0].Switch.Peers().List()[0]
	ps = peerState{height}
	peer.Set(types.PeerStateKey, ps)

	// wait to see that only two evidence is sent
	time.Sleep(300 * time.Millisecond)

	peerEv, _ = pools[1].PendingEvidence(1000)
	assert.EqualValues(t, []types.Evidence{evList[0], evList[1]}, peerEv)
}

func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
	evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
	evidenceDB := dbm.NewMemDB()
	blockStore := &mocks.BlockStore{}
	blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
		&types.BlockMeta{Header: types.Header{Time: evidenceTime}},
	)
	val := types.NewMockPV()
	stateStore := initializeValidatorState(val, 1)
	pool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
	require.NoError(t, err)

	p := &p2pmocks.Peer{}

	p.On("IsRunning").Once().Return(true)
	p.On("IsRunning").Return(false)
	// check that we are not leaking any go-routines
	// i.e. broadcastEvidenceRoutine finishes when peer is stopped
	defer leaktest.CheckTimeout(t, 10*time.Second)()

	p.On("Send", mock.MatchedBy(func(i interface{}) bool {
		e, ok := i.(p2p.Envelope)
		return ok && e.ChannelID == evidence.EvidenceChannel
	})).Return(false)
	quitChan := make(<-chan struct{})
	p.On("Quit").Return(quitChan)
	ps := peerState{2}
	p.On("Get", types.PeerStateKey).Return(ps)
	p.On("ID").Return("ABC")
	p.On("String").Return("mock")

	r := evidence.NewReactor(pool)
	r.SetLogger(log.TestingLogger())
	r.AddPeer(p)

	_ = sendEvidence(t, pool, val, 2)
}

// evidenceLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func evidenceLogger() log.Logger {
	return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
		for i := 0; i < len(keyvals)-1; i += 2 {
			if keyvals[i] == "validator" {
				return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
			}
		}
		return term.FgBgColor{}
	})
}

// connect N evidence reactors through N switches
func makeAndConnectReactorsAndPools(config *cfg.Config, stateStores []sm.Store) ([]*evidence.Reactor,
	[]*evidence.Pool) {
	N := len(stateStores)

	reactors := make([]*evidence.Reactor, N)
	pools := make([]*evidence.Pool, N)
	logger := evidenceLogger()
	evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)

	for i := 0; i < N; i++ {
		evidenceDB := dbm.NewMemDB()
		blockStore := &mocks.BlockStore{}
		blockStore.On("LoadBlockMeta", mock.AnythingOfType("int64")).Return(
			&types.BlockMeta{Header: types.Header{Time: evidenceTime}},
		)
		pool, err := evidence.NewPool(evidenceDB, stateStores[i], blockStore)
		if err != nil {
			panic(err)
		}
		pools[i] = pool
		reactors[i] = evidence.NewReactor(pool)
		reactors[i].SetLogger(logger.With("validator", i))
	}

	p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
		s.AddReactor("EVIDENCE", reactors[i])
		return s

	}, p2p.Connect2Switches)

	return reactors, pools
}

// wait for all evidence on all reactors
func waitForEvidence(t *testing.T, evs types.EvidenceList, pools []*evidence.Pool) {
	// wait for the evidence in all evpools
	wg := new(sync.WaitGroup)
	for i := 0; i < len(pools); i++ {
		wg.Add(1)
		go _waitForEvidence(t, wg, evs, i, pools)
	}

	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	timer := time.After(timeout)
	select {
	case <-timer:
		t.Fatal("Timed out waiting for evidence")
	case <-done:
	}
}

// wait for all evidence on a single evpool
func _waitForEvidence(
	t *testing.T,
	wg *sync.WaitGroup,
	evs types.EvidenceList,
	poolIdx int,
	pools []*evidence.Pool,
) {
	evpool := pools[poolIdx]
	var evList []types.Evidence
	currentPoolSize := 0
	for currentPoolSize != len(evs) {
		evList, _ = evpool.PendingEvidence(int64(len(evs) * 500)) // each evidence should not be more than 500 bytes
		currentPoolSize = len(evList)
		time.Sleep(time.Millisecond * 100)
	}

	// put the reaped evidence in a map so we can quickly check we got everything
	evMap := make(map[string]types.Evidence)
	for _, e := range evList {
		evMap[string(e.Hash())] = e
	}
	for i, expectedEv := range evs {
		gotEv := evMap[string(expectedEv.Hash())]
		assert.Equal(t, expectedEv, gotEv,
			fmt.Sprintf("evidence at index %d on pool %d don't match: %v vs %v",
				i, poolIdx, expectedEv, gotEv))
	}

	wg.Done()
}

func sendEvidence(t *testing.T, evpool *evidence.Pool, val types.PrivValidator, n int) types.EvidenceList {
	evList := make([]types.Evidence, n)
	for i := 0; i < n; i++ {
		ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(int64(i+1),
			time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC), val, evidenceChainID)
		require.NoError(t, err)
		err = evpool.AddEvidence(ev)
		require.NoError(t, err)
		evList[i] = ev
	}
	return evList
}

type peerState struct {
	height int64
}

func (ps peerState) GetHeight() int64 {
	return ps.height
}

func exampleVote(t byte) *types.Vote {
	var stamp, err = time.Parse(types.TimeFormat, "2017-12-25T03:00:01.234Z")
	if err != nil {
		panic(err)
	}

	return &types.Vote{
		Type:      tmproto.SignedMsgType(t),
		Height:    3,
		Round:     2,
		Timestamp: stamp,
		BlockID: types.BlockID{
			Hash: tmhash.Sum([]byte("blockID_hash")),
			PartSetHeader: types.PartSetHeader{
				Total: 1000000,
				Hash:  tmhash.Sum([]byte("blockID_part_set_header_hash")),
			},
		},
		ValidatorAddress: crypto.AddressHash([]byte("validator_address")),
		ValidatorIndex:   56789,
	}
}

//nolint:lll //ignore line length for tests
func TestEvidenceVectors(t *testing.T) {

	val := &types.Validator{
		Address:     crypto.AddressHash([]byte("validator_address")),
		VotingPower: 10,
	}

	valSet := types.NewValidatorSet([]*types.Validator{val})

	dupl, err := types.NewDuplicateVoteEvidence(
		exampleVote(1),
		exampleVote(2),
		defaultEvidenceTime,
		valSet,
	)
	require.NoError(t, err)

	testCases := []struct {
		testName     string
		evidenceList []types.Evidence
		expBytes     string
	}{
		{"DuplicateVoteEvidence", []types.Evidence{dupl}, "0a85020a82020a79080210031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb031279080110031802224a0a208b01023386c371778ecb6368573e539afc3cc860ec3a2f614e54fe5652f4fc80122608c0843d122072db3d959635dff1bb567bedaa70573392c5159666a3f8caf11e413aac52207a2a0b08b1d381d20510809dca6f32146af1f4111082efb388211bc72c55bcd61e9ac3d538d5bb03180a200a2a060880dbaae105"},
	}

	for _, tc := range testCases {
		tc := tc

		evi := make([]tmproto.Evidence, len(tc.evidenceList))
		for i := 0; i < len(tc.evidenceList); i++ {
			ev, err := types.EvidenceToProto(tc.evidenceList[i])
			require.NoError(t, err, tc.testName)
			evi[i] = *ev
		}

		epl := tmproto.EvidenceList{
			Evidence: evi,
		}

		bz, err := epl.Marshal()
		require.NoError(t, err, tc.testName)

		require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName)

	}

}
back to top