https://github.com/tendermint/tendermint
Revision b9508ffecba0ded9b3a401f1acaba67e273ff053 authored by Sean Braithwaite on 27 May 2019, 19:44:56 UTC, committed by Ethan Buchman on 27 May 2019, 19:44:56 UTC
* Peer behaviour test tweaks:

    Address the remaining test issues mentioned in ##3552 notably:
    + switch to p2p_test package
    + Use `Error` instead of `Errorf` when not using formatting
    + Add expected/got errors to
    `TestMockPeerBehaviourReporterConcurrency` test

* Peer behaviour equal behaviours test

    + slices of PeerBehaviours should be compared as histograms to
    ensure they have the same set of PeerBehaviours at the same
    freequncy.

* TestEqualPeerBehaviours:

    + Add tests for the equivalence between sets of PeerBehaviours
1 parent a6ac611
Raw File
Tip revision: b9508ffecba0ded9b3a401f1acaba67e273ff053 authored by Sean Braithwaite on 27 May 2019, 19:44:56 UTC
[p2p] Peer behaviour test tweaks (#3662)
Tip revision: b9508ff
reactor.go
package evidence

import (
	"fmt"
	"reflect"
	"time"

	amino "github.com/tendermint/go-amino"

	clist "github.com/tendermint/tendermint/libs/clist"
	"github.com/tendermint/tendermint/libs/log"
	"github.com/tendermint/tendermint/p2p"
	"github.com/tendermint/tendermint/types"
)

const (
	EvidenceChannel = byte(0x38)

	maxMsgSize = 1048576 // 1MB TODO make it configurable

	broadcastEvidenceIntervalS = 60  // broadcast uncommitted evidence this often
	peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
)

// EvidenceReactor handles evpool evidence broadcasting amongst peers.
type EvidenceReactor struct {
	p2p.BaseReactor
	evpool   *EvidencePool
	eventBus *types.EventBus
}

// NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
	evR := &EvidenceReactor{
		evpool: evpool,
	}
	evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
	return evR
}

// SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *EvidenceReactor) SetLogger(l log.Logger) {
	evR.Logger = l
	evR.evpool.SetLogger(l)
}

// GetChannels implements Reactor.
// It returns the list of channels for this reactor.
func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
	return []*p2p.ChannelDescriptor{
		{
			ID:       EvidenceChannel,
			Priority: 5,
		},
	}
}

// AddPeer implements Reactor.
func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
	go evR.broadcastEvidenceRoutine(peer)
}

// Receive implements Reactor.
// It adds any received evidence to the evpool.
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
	msg, err := decodeMsg(msgBytes)
	if err != nil {
		evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
		evR.Switch.StopPeerForError(src, err)
		return
	}

	if err = msg.ValidateBasic(); err != nil {
		evR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
		evR.Switch.StopPeerForError(src, err)
		return
	}

	evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)

	switch msg := msg.(type) {
	case *EvidenceListMessage:
		for _, ev := range msg.Evidence {
			err := evR.evpool.AddEvidence(ev)
			if err != nil {
				evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
				// punish peer
				evR.Switch.StopPeerForError(src, err)
			}
		}
	default:
		evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
	}
}

// SetEventSwitch implements events.Eventable.
func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
	evR.eventBus = b
}

// Modeled after the mempool routine.
// - Evidence accumulates in a clist.
// - Each peer has a routien that iterates through the clist,
// sending available evidence to the peer.
// - If we're waiting for new evidence and the list is not empty,
// start iterating from the beginning again.
func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
	var next *clist.CElement
	for {
		// This happens because the CElement we were looking at got garbage
		// collected (removed). That is, .NextWait() returned nil. Go ahead and
		// start from the beginning.
		if next == nil {
			select {
			case <-evR.evpool.EvidenceWaitChan(): // Wait until evidence is available
				if next = evR.evpool.EvidenceFront(); next == nil {
					continue
				}
			case <-peer.Quit():
				return
			case <-evR.Quit():
				return
			}
		}

		ev := next.Value.(types.Evidence)
		msg, retry := evR.checkSendEvidenceMessage(peer, ev)
		if msg != nil {
			success := peer.Send(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
			retry = !success
		}

		if retry {
			time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
			continue
		}

		afterCh := time.After(time.Second * broadcastEvidenceIntervalS)
		select {
		case <-afterCh:
			// start from the beginning every tick.
			// TODO: only do this if we're at the end of the list!
			next = nil
		case <-next.NextWaitChan():
			// see the start of the for loop for nil check
			next = next.Next()
		case <-peer.Quit():
			return
		case <-evR.Quit():
			return
		}
	}
}

// Returns the message to send the peer, or nil if the evidence is invalid for the peer.
// If message is nil, return true if we should sleep and try again.
func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) {
	// make sure the peer is up to date
	evHeight := ev.Height()
	peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
	if !ok {
		// Peer does not have a state yet. We set it in the consensus reactor, but
		// when we add peer in Switch, the order we call reactors#AddPeer is
		// different every time due to us using a map. Sometimes other reactors
		// will be initialized before the consensus reactor. We should wait a few
		// milliseconds and retry.
		return nil, true
	}

	// NOTE: We only send evidence to peers where
	// peerHeight - maxAge < evidenceHeight < peerHeight
	maxAge := evR.evpool.State().ConsensusParams.Evidence.MaxAge
	peerHeight := peerState.GetHeight()
	if peerHeight < evHeight {
		// peer is behind. sleep while he catches up
		return nil, true
	} else if peerHeight > evHeight+maxAge {
		// evidence is too old, skip
		// NOTE: if evidence is too old for an honest peer,
		// then we're behind and either it already got committed or it never will!
		evR.Logger.Info("Not sending peer old evidence", "peerHeight", peerHeight, "evHeight", evHeight, "maxAge", maxAge, "peer", peer)
		return nil, false
	}

	// send evidence
	msg = &EvidenceListMessage{[]types.Evidence{ev}}
	return msg, false
}

// PeerState describes the state of a peer.
type PeerState interface {
	GetHeight() int64
}

//-----------------------------------------------------------------------------
// Messages

// EvidenceMessage is a message sent or received by the EvidenceReactor.
type EvidenceMessage interface {
	ValidateBasic() error
}

func RegisterEvidenceMessages(cdc *amino.Codec) {
	cdc.RegisterInterface((*EvidenceMessage)(nil), nil)
	cdc.RegisterConcrete(&EvidenceListMessage{},
		"tendermint/evidence/EvidenceListMessage", nil)
}

func decodeMsg(bz []byte) (msg EvidenceMessage, err error) {
	if len(bz) > maxMsgSize {
		return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
	}
	err = cdc.UnmarshalBinaryBare(bz, &msg)
	return
}

//-------------------------------------

// EvidenceListMessage contains a list of evidence.
type EvidenceListMessage struct {
	Evidence []types.Evidence
}

// ValidateBasic performs basic validation.
func (m *EvidenceListMessage) ValidateBasic() error {
	for i, ev := range m.Evidence {
		if err := ev.ValidateBasic(); err != nil {
			return fmt.Errorf("Invalid evidence (#%d): %v", i, err)
		}
	}
	return nil
}

// String returns a string representation of the EvidenceListMessage.
func (m *EvidenceListMessage) String() string {
	return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
}
back to top