https://gitlab.com/tezos/tezos
Tip revision: 40e26e826a34362bcdf350e4ed6db751419ef3f2 authored by Sylvain R. on 26 January 2024, 11:02:47 UTC
EVM/Sequencer: specify delayed inbox in pp
EVM/Sequencer: specify delayed inbox in pp
Tip revision: 40e26e8
test_unit.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
(* Testing
-------
Component: Gossipsub
Invocation: dune exec test/test_gossipsub.exe -- --file test_unit.ml
Subject: Unit tests for gossipsub
*)
open Test_gossipsub_shared
open Gossipsub_intf
open Tezt
open Tezt_core.Base
module Peer = C.Subconfig.Peer
module Topic = C.Subconfig.Topic
module Message_id = C.Subconfig.Message_id
module Message = C.Subconfig.Message
let assert_output ~__LOC__ actual expected =
(* TODO: https://gitlab.com/tezos/tezos/-/issues/5079
Use non-polymorphic compare and print actual/expected output on failure. *)
if actual = expected then ()
else Test.fail ~__LOC__ "Assert for output failed."
let assert_subscribed_topics ~__LOC__ ~peer ~expected_topics state =
let actual_topics =
GS.Introspection.(get_subscribed_topics peer (view state))
in
Check.(
(actual_topics = expected_topics)
(list string)
~error_msg:"Expected %R, got %L"
~__LOC__)
let assert_fanout_size ~__LOC__ ~topic ~expected_size state =
let view = GS.Introspection.view state in
let fanout_peers = GS.Introspection.get_fanout_peers topic view in
Check.(
(List.length fanout_peers = expected_size)
int
~error_msg:"Expected %R, got %L"
~__LOC__)
(* Note: a new message cache state is returned when inspecting it, but this
function does not return this updated state! *)
let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state =
let view = GS.Introspection.view state in
match
GS.Introspection.Message_cache.get_message_for_peer
peer
message_id
view.message_cache
with
| None ->
Test.fail "Expected entry in message cache for message id %d" message_id
| Some (_message_cache_state, message, _access) ->
Check.(
(message = expected_message)
string
~error_msg:"Expected %R, got %L"
~__LOC__)
let assert_mesh_inclusion ~__LOC__ ~topic ~peer ~is_included state =
let view = GS.Introspection.view state in
let topic_mesh = GS.Introspection.get_peers_in_topic_mesh topic view in
Check.(
(List.mem ~equal:Int.equal peer topic_mesh = is_included)
bool
~error_msg:"Expected %R, got %L"
~__LOC__)
let assert_mesh_size ~__LOC__ ~topic ~expected_size state =
let view = GS.Introspection.view state in
let topic_mesh = GS.Introspection.get_peers_in_topic_mesh topic view in
Check.(
(List.length topic_mesh = expected_size)
int
~error_msg:"Expected %R, got %L"
~__LOC__)
let assert_iwant_output_success ~__LOC__ output =
match output with
| GS.On_iwant_messages_to_route {routed_message_ids} -> routed_message_ids
| Iwant_from_peer_with_low_score _ ->
Test.fail ~__LOC__ "Expected IWant to succeed."
let assert_topic_mesh ~__LOC__ ~topic ~expected_peers state =
let actual_peers =
GS.Introspection.(get_peers_in_topic_mesh topic (view state))
in
Check.(
(actual_peers = expected_peers)
(list int)
~error_msg:"Expected %R, got %L"
~__LOC__)
let assert_peer_score ~__LOC__ ~expected_score peer state =
let view = GS.Introspection.view state in
let actual_score =
GS.Introspection.get_peer_score peer view |> GS.Score.Introspection.to_float
in
Check.(
(actual_score = expected_score)
float
~error_msg:"Expected score %R, got %L"
~__LOC__)
let many_peers limits = (4 * limits.degree_optimal) + 1
let make_peers ~number =
List.init ~when_negative_length:() number (fun i -> i)
|> WithExceptions.Result.get_ok ~loc:__LOC__
(** [add_and_subscribe_peers topics peers] adds [peers] to the
gossipsub connections and subscribes each peer to [topics]. *)
let add_and_subscribe_peers (topics : Topic.t list) (peers : Peer.t list)
~(to_subscribe : Peer.t * Topic.t -> bool)
?(direct : Peer.t -> bool = fun _ -> false)
?(outbound : Peer.t -> bool = fun _ -> false) state =
let subscribe_peer_to_topics peer topics state =
List.fold_left
(fun state topic ->
if not @@ to_subscribe (peer, topic) then state
else
let state, output = GS.handle_subscribe {topic; peer} state in
assert_output ~__LOC__ output Subscribed ;
state)
state
topics
in
List.fold_left
(fun state peer ->
let state, output =
GS.add_peer {direct = direct peer; outbound = outbound peer; peer} state
in
assert_output ~__LOC__ output Peer_added ;
subscribe_peer_to_topics peer topics state)
state
peers
let init_state ~rng ~limits ~parameters ~peers ~topics
?(to_join : Topic.t -> bool = fun _ -> true)
?(direct : Peer.t -> bool = fun _ -> false)
?(outbound : Peer.t -> bool = fun _ -> false)
~(to_subscribe : Peer.t * Topic.t -> bool) () =
let state = GS.make rng limits parameters in
(* Add and subscribe the given peers. *)
let state =
add_and_subscribe_peers topics peers ~to_subscribe ~direct ~outbound state
in
(* Join to the given topics. *)
let state =
List.fold_left
(fun state topic ->
if to_join topic then
let state, _output = GS.join {topic} state in
state
else state)
state
topics
in
state
let gen_message =
let counter = ref 0 in
fun () ->
let message_id = !counter in
let message = "message" ^ string_of_int message_id in
incr counter ;
(message_id, message)
(** Test that grafting an unknown topic is ignored.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4367 *)
let test_ignore_graft_from_unknown_topic rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Ignore graft from unknown topic"
~tags:["gossipsub"; "graft"]
@@ fun () ->
let peers = make_peers ~number:1 in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[]
~to_subscribe:(fun _ -> false)
()
in
let peers = Array.of_list peers in
let _state, output =
GS.handle_graft {peer = peers.(0); topic = "unknown_topic"} state
in
(* TODO: https://gitlab.com/tezos/tezos/-/issues/5079
Use Tezt.Check to assert output *)
match output with
| Unsubscribed_topic -> unit
| _ ->
Tezt.Test.fail
"Expected output [Unsubscribed_topic]; got [%a]"
GS.pp_output
output
(** Test that:
- Subscribing a known peer to a topic adds the topic to their subscriptions.
- Subscribing an unknown peer to a topic does nothing.
- Unsubscribing a peer from a topic removes the topic from their subscriptions.
- Unsubscribing a non-subscribed topic from a peer has no effect.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L852
*)
let test_handle_received_subscriptions rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Handle received subscriptions"
~tags:["gossipsub"; "subscribe"]
@@ fun () ->
let topics = ["topic1"; "topic2"; "topic3"; "topic4"] in
let peers = make_peers ~number:2 in
let state =
init_state
~peers
~rng
~limits
~parameters
~topics
~to_subscribe:(fun _ -> false)
()
in
let peers = Array.of_list peers in
(* The first peer, second peer, and an unknown peer sends
3 subscriptions and 1 unsubscription *)
let unknown_peer = 99 in
let state =
[peers.(0); peers.(1); unknown_peer]
|> List.fold_left
(fun state peer ->
let state =
["topic1"; "topic2"; "topic3"]
|> List.fold_left
(fun state topic ->
let state, _ = GS.handle_subscribe {topic; peer} state in
state)
state
in
let state, _ =
GS.handle_unsubscribe {topic = "topic4"; peer} state
in
state)
state
in
(* First and second peer should be subscribed to three topics *)
assert_subscribed_topics
~__LOC__
~peer:peers.(0)
~expected_topics:["topic1"; "topic2"; "topic3"]
state ;
assert_subscribed_topics
~__LOC__
~peer:peers.(1)
~expected_topics:["topic1"; "topic2"; "topic3"]
state ;
(* Unknown peer should not be subscribed to any topic *)
assert_subscribed_topics ~__LOC__ ~peer:unknown_peer ~expected_topics:[] state ;
(* Heartbeat to fill the mesh. *)
let state, _ = GS.heartbeat state in
(* Both peer 0 and peer 1 should be in the mesh of "topic1". *)
assert_topic_mesh
~__LOC__
~topic:"topic1"
~expected_peers:[peers.(0); peers.(1)]
state ;
(* Peer 0 unsubscribes from the first topic *)
let state, _ =
GS.handle_unsubscribe {topic = "topic1"; peer = peers.(0)} state
in
(* Peer 0 should be subscribed to two topics *)
assert_subscribed_topics
~__LOC__
~peer:peers.(0)
~expected_topics:["topic2"; "topic3"]
state ;
(* Only peer 1 should be in the mesh of "topic1". *)
assert_topic_mesh ~__LOC__ ~topic:"topic1" ~expected_peers:[peers.(1)] state ;
unit
(* The Join function should:
- Fill up mesh with known gossipsub peers in the topic
- Returns GRAFT requests for all nodes added to the mesh
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L512
*)
let test_join_adds_peers_to_mesh rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test join adds peers to mesh"
~tags:["gossipsub"; "join"]
@@ fun () ->
let topics = ["topic0"] in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics
~to_subscribe:(fun _ -> true)
()
in
(* leave, then call join to invoke functionality *)
let topic = "topic0" in
let state, _ = GS.leave {topic} state in
(* re-join - there should be peers associated with the topic *)
let state, to_graft =
match GS.join {topic} state with
| state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft)
| _, _ -> Test.fail ~__LOC__ "Expected Join to succeed"
in
(* should have added [degree_optimal] nodes to the mesh *)
let peers_in_topic =
GS.Introspection.(get_peers_in_topic_mesh "topic0" (view state))
in
Check.(
(List.length peers_in_topic = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* there should be [degree_optimal] GRAFT messages. *)
Check.(
(List.length to_graft = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
unit
(* The Join function should:
- Remove peers from fanout[topic]
- Add any fanout[topic] peers to the mesh
- Fill up mesh with known gossipsub peers in the topic
- Returns GRAFT requests for all nodes added to the mesh
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L512
*)
let test_join_adds_fanout_to_mesh rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test join adds fanout to mesh"
~tags:["gossipsub"; "join"; "fanout"]
@@ fun () ->
let topics = ["topic0"] in
(* We initialize the state with [degree_optimal / 2] peers
so the mesh won't be filled with just fanout peers when we call [GS.join]. *)
let init_peers, additional_peers =
List.split_n (limits.degree_optimal / 2)
@@ make_peers ~number:(many_peers limits)
in
let state =
init_state
~rng
~limits
~parameters
~peers:init_peers
~topics
~to_join:(fun _ -> false)
~to_subscribe:(fun _ -> true)
()
in
(* Publish to topic0.
We did not join the topic so the peers should be added to the fanout map.*)
let state, _ =
GS.publish_message
{topic = "topic0"; message_id = 0; message = "message"}
state
in
(* Check that all [init_peers] have been added to the fanout. *)
let fanout_peers =
GS.Introspection.(get_fanout_peers "topic0" (view state))
in
Check.(
(List.length fanout_peers = limits.degree_optimal / 2)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* Add additonal peers *)
let state =
add_and_subscribe_peers
topics
additional_peers
state
~to_subscribe:(fun _ -> true)
in
(* Join to topic0 *)
let state, to_graft =
match GS.join {topic = "topic0"} state with
| state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft)
| _, _ -> Test.fail ~__LOC__ "Expected Join to succeed"
in
let peers_in_topic =
GS.Introspection.(get_peers_in_topic_mesh "topic0" (view state))
in
(* All [degree_optimal / 2] fanout peers should have been added to the mesh,
along with [degree_optimal / 2] more from the pool. *)
Check.(
(List.length peers_in_topic = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
List.iter
(fun peer ->
if not @@ List.mem ~equal:Int.equal peer peers_in_topic then
Test.fail
"Fanout peer %d should be included in the topic mesh [%a]"
peer
(Format.pp_print_list
~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ")
Format.pp_print_int)
peers_in_topic
else ())
fanout_peers ;
(* There should be [degree_optimal] additional GRAFT messages. *)
Check.(
(List.length to_graft = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* Check that the fanout map has been cleared. *)
let fanout_peers =
GS.Introspection.(get_fanout_peers "topic0" (view state))
in
Check.(
(List.length fanout_peers = 0) int ~error_msg:"Expected %R, got %L" ~__LOC__) ;
unit
(** Tests that publishing to a subscribed topic:
- Returns peers to publish to.
- Inserts message into message cache.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L629
*)
let test_publish_without_flood_publishing rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test publish without flood publishing"
~tags:["gossipsub"; "publish"]
@@ fun () ->
let topic = "test_publish" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_join:(fun _ -> false)
~to_subscribe:(fun _ -> true)
()
in
let publish_data = "some_data" in
let message_id = 0 in
(* Publish to a joined topic. *)
let state, output =
GS.publish_message {topic; message_id; message = publish_data} state
in
let peers_to_publish =
match output with
| Already_published -> Test.fail ~__LOC__ "Publish should succeed."
| Publish_message {to_publish} -> to_publish
in
(* Should return [degree_optimal] peers to publish to. *)
Check.(
(Peer.Set.cardinal peers_to_publish = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* [message_id] should be added to the message cache. *)
assert_in_message_cache
~__LOC__
message_id
~peer:(Stdlib.List.hd peers)
~expected_message:publish_data
state ;
unit
(** Tests that publishing to an unsubscribed topic:
- Populate fanout peers.
- Return peers to publish to.
- Inserts message into the message cache.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L715
*)
let test_fanout rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test fanout"
~tags:["gossipsub"; "publish"; "fanout"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_join:(fun _ -> false)
~to_subscribe:(fun _ -> true)
()
in
(* Leave the topic. *)
let state, _ = GS.leave {topic} state in
(* Publish to the topic we left. *)
let publish_data = "some data" in
let message_id = 0 in
let state, output =
GS.publish_message {topic; message_id; message = publish_data} state
in
let peers_to_publish =
match output with
| Already_published -> Test.fail ~__LOC__ "Publish should succeed."
| Publish_message {to_publish} -> to_publish
in
(* Fanout should contain [degree_optimal] peers. *)
assert_fanout_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ;
(* Should return [degree_optimal] peers to publish to. *)
Check.(
(Peer.Set.cardinal peers_to_publish = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* [message_id] should be added to the message cache. *)
assert_in_message_cache
~__LOC__
message_id
~peer:(Stdlib.List.hd peers)
~expected_message:publish_data
state ;
unit
(** Tests that receiving a message for a subscribed topic:
- Returns peers to publish to.
- Inserts message into message cache. *)
let test_receiving_message rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test receiving message"
~tags:["gossipsub"; "receiving_message"]
@@ fun () ->
let topic = "test" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_join:(fun _ -> true)
~to_subscribe:(fun _ -> true)
()
in
let sender = 99 in
let message = "some_data" in
let message_id = 0 in
(* Receive a message for a joined topic. *)
let state, output =
GS.handle_receive_message {sender; topic; message_id; message} state
in
let peers_to_route =
match output with
| Already_received | Not_subscribed | Invalid_message | Unknown_validity ->
Test.fail ~__LOC__ "Handling of received message should succeed."
| Route_message {to_route} -> to_route
in
(* Should return [degree_optimal] peers to route the message to. *)
Check.(
(Peer.Set.cardinal peers_to_route = limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* [message_id] should be added to the message cache. *)
assert_in_message_cache
~__LOC__
message_id
~peer:sender
~expected_message:message
state ;
unit
(** Tests that we do not route the message when receiving a message
for an unsubscribed topic. *)
let test_receiving_message_for_unsusbcribed_topic rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test receiving message for unsubscribed topic"
~tags:["gossipsub"; "receive_message"; "fanout"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_join:(fun _ -> false)
~to_subscribe:(fun _ -> true)
()
in
(* Leave the topic. *)
let state, _ = GS.leave {topic} state in
(* Receive message for the topic we left. *)
let sender = Stdlib.List.hd peers in
let message = "some data" in
let message_id = 0 in
let _state, output =
GS.handle_receive_message {sender; topic; message_id; message} state
in
match output with
| Already_received | Route_message _ | Invalid_message | Unknown_validity ->
Test.fail
~__LOC__
"Handling of received message should fail with [Not_subscribed]."
| Not_subscribed -> unit
(** Tests that a peer is added to our mesh on graft when we are both
joined/subscribed to the same topic.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1250
*)
let test_handle_graft_for_joined_topic rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle graft for subscribed topic"
~tags:["gossipsub"; "graft"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
(* Prune peer with backoff 0 to be sure that the peer is not in mesh. *)
let peer = peers.(7) in
let state, _ =
GS.handle_prune
{peer; topic; px = Seq.empty; backoff = Milliseconds.Span.zero}
state
in
assert_mesh_inclusion ~__LOC__ ~peer ~topic state ~is_included:false ;
(* Graft peer. *)
let state, _ = GS.handle_graft {peer; topic} state in
(* Check that the grafted peer is in mesh. *)
assert_mesh_inclusion ~__LOC__ ~peer ~topic state ~is_included:true ;
unit
(** Tests that a peer is not added to our mesh on graft when
we have not joined the topic.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1263
*)
let test_handle_graft_for_not_joined_topic rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle graft for not joined topic"
~tags:["gossipsub"; "graft"]
@@ fun () ->
let topic = "topic" in
let peer_number = many_peers limits in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Add new peer and graft it with an unknown topic. *)
let new_peer = peer_number + 1 in
let state =
add_and_subscribe_peers
[topic]
[new_peer]
~to_subscribe:(fun _ -> true)
state
in
let state, output =
GS.handle_graft {peer = new_peer; topic = "not joined topic"} state
in
(* Check that the graft did not take effect. *)
assert_mesh_inclusion ~__LOC__ ~peer:new_peer ~topic state ~is_included:false ;
assert_output ~__LOC__ output Unsubscribed_topic ;
unit
(** Tests sending a graft without subscribing to the topic results in susbcribing to the topic. *)
let test_handle_graft_from_unsubscribed_peer rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle graft without subscribe"
~tags:["gossipsub"; "graft"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:1 in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> false)
()
in
let peers = Array.of_list peers in
let peer = peers.(0) in
(* Check that the peers is not subscribed to the topic before sending graft. *)
assert_subscribed_topics ~__LOC__ ~peer:peers.(0) ~expected_topics:[] state ;
(* Send graft. *)
let state, _output = GS.handle_graft {peer; topic} state in
(* Check that the peers is now subscribed to the topic. *)
assert_subscribed_topics
~__LOC__
~peer:peers.(0)
~expected_topics:[topic]
state ;
unit
(** Tests that prune removes peer from our mesh.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1323
*)
let test_handle_prune_peer_in_mesh rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test prune removes peer from mesh"
~tags:["gossipsub"; "prune"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
(* First graft to be sure that the peer is in the mesh. *)
let state, _ = GS.handle_graft {peer; topic} state in
assert_mesh_inclusion ~__LOC__ ~peer ~topic state ~is_included:true ;
(* Next prune the peer and check if the peer is removed from the mesh. *)
let state, _ =
GS.handle_prune
{peer; topic; px = Seq.empty; backoff = limits.prune_backoff}
state
in
assert_mesh_inclusion ~__LOC__ ~peer ~topic state ~is_included:false ;
unit
(** Test mesh addition in maintainance heartbeat.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1745
*)
let test_mesh_addition rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test mesh addition in maintainance"
~tags:["gossipsub"; "heartbeat"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(limits.degree_optimal + 2) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
assert_mesh_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ;
let peers_in_mesh =
GS.Introspection.(get_peers_in_topic_mesh topic (view state))
in
(* Remove two peers from mesh via prune. *)
let state =
List.take_n 2 peers_in_mesh
|> List.fold_left
(fun state peer ->
let state, _ =
GS.handle_prune
{peer; topic; px = Seq.empty; backoff = limits.prune_backoff}
state
in
state)
state
in
assert_mesh_size
~__LOC__
~topic
~expected_size:(limits.degree_optimal - 2)
state ;
(* Heartbeat. *)
let state, Heartbeat {to_graft; _} = GS.heartbeat state in
(* There should be two grafting requests to fill the mesh. *)
let peers_to_graft =
to_graft |> Peer.Map.bindings |> List.map (fun (peer, _topic) -> peer)
in
Check.(
(List.length peers_to_graft = 2)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* Mesh size should be [degree_optimal] and the newly grafted peers should be in the mesh. *)
assert_mesh_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ;
List.iter
(fun peer ->
assert_mesh_inclusion ~__LOC__ ~topic ~peer ~is_included:true state)
peers_to_graft ;
unit
(** Test mesh subtraction in maintainance heartbeat.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1780
*)
let test_mesh_subtraction rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test mesh subtraction in maintainance"
~tags:["gossipsub"; "heartbeat"]
@@ fun () ->
let topic = "topic" in
let peer_number = limits.degree_high + 10 in
let peers = make_peers ~number:peer_number in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
~outbound:(fun _ -> true)
()
in
(* Graft all the peers. This works because the connections are outbound. *)
let state =
List.fold_left
(fun state peer ->
let state, _ = GS.handle_graft {peer; topic} state in
state)
state
peers
in
assert_mesh_size ~__LOC__ ~topic ~expected_size:peer_number state ;
(* Heartbeat. *)
let state, Heartbeat {to_prune; _} = GS.heartbeat state in
(* There should be enough prune requests to bring back the mesh size to [degree_optimal]. *)
let peers_to_prune =
to_prune |> Peer.Map.bindings |> List.map (fun (peer, _topic) -> peer)
in
Check.(
(List.length peers_to_prune = peer_number - limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* Mesh size should be [degree_optimal] and the pruned peers should not be in the mesh. *)
assert_mesh_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ;
List.iter
(fun peer ->
assert_mesh_inclusion ~__LOC__ ~topic ~peer ~is_included:false state)
peers_to_prune ;
unit
(** Tests that the heartbeat does not graft peers that are waiting the backoff period.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1943
*)
let test_do_not_graft_within_backoff_period rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Do not graft within backoff period"
~tags:["gossipsub"; "heartbeat"; "graft"; "prune"]
@@ fun () ->
let topic = "topic" in
(* Only one peer => mesh too small and will try to regraft as early as possible *)
let peers = make_peers ~number:1 in
let state =
init_state
~rng
~limits:
{
limits with
(* Run backoff clearing on every heartbeat tick. *)
backoff_cleanup_ticks = 1;
(* We will run the heartbeat tick on each time tick to simplify the test. *)
heartbeat_interval = Milliseconds.Span.of_int_s 1;
}
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
(* Prune peer with backoff of 30 time ticks. *)
let backoff = Milliseconds.of_int_s 30 in
let state, _ =
GS.handle_prune {peer = peers.(0); topic; px = Seq.empty; backoff} state
in
(* No graft should be emitted until 32 time ticks pass.
The additional 2 time ticks is due to the "backoff slack". *)
let state =
List.init
~when_negative_length:()
(Milliseconds.to_int_s backoff + 1)
(fun i -> i + 1)
|> WithExceptions.Result.get_ok ~loc:__LOC__
|> List.fold_left
(fun state i ->
Time.elapse @@ Milliseconds.of_int_s 1 ;
Log.info "%d time tick(s) elapsed..." i ;
let state, Heartbeat {to_graft; _} = GS.heartbeat state in
let grafts = Peer.Map.bindings to_graft in
Check.(
(List.length grafts = 0)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
state)
state
in
(* After elapsing one more second,
the backoff should be cleared and the graft should be emitted. *)
Time.elapse @@ Milliseconds.of_int_s 1 ;
let _state, Heartbeat {to_graft; _} = GS.heartbeat state in
let grafts = Peer.Map.bindings to_graft in
Check.((List.length grafts = 1) int ~error_msg:"Expected %R, got %L" ~__LOC__) ;
unit
(* Tests that the node leaving a topic introduces a backoff period,
and that the heartbeat respects the introduced backoff.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L2041
*)
let test_unsubscribe_backoff rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Unsubscribe backoff"
~tags:["gossipsub"; "heartbeat"; "join"; "leave"]
@@ fun () ->
let topic = "topic" in
let per_topic_score_limits =
GS.Score.Internal_for_tests.get_topic_params limits.score_limits topic
in
let mesh_message_deliveries_activation =
per_topic_score_limits.mesh_message_deliveries_activation
in
(* Only one peer => mesh too small and will try to regraft as early as possible *)
let peers = make_peers ~number:1 in
(* Number of ticks until the unsubscribe backoff expires. *)
let backoff_ticks = 5 in
(* We must set [heartbeat_interval] so that
number_of_heartbeats_in_the_test * [heartbeat_interval] < [mesh_message_deliveries_activation]
holds. This prevents surpassing [mesh_message_deliveries_activation] within the test,
thus avoiding the activation of p3 penalty. Since number_of_heartbeats_in_the_test
is [backoff_ticks + 2], we set [heartbeat_interval] as the following.*)
let heartbeat_interval =
(Milliseconds.to_int_ms mesh_message_deliveries_activation - 1)
/ (backoff_ticks + 2)
|> Milliseconds.of_int_ms
in
(* Time required until unsubscribe backoff expires. *)
let unsubscribe_backoff =
Milliseconds.(of_int_ms @@ (backoff_ticks * to_int_ms heartbeat_interval))
in
let state =
init_state
~rng
~limits:
{
limits with
heartbeat_interval;
(* Run backoff clearing on every heartbeat tick. *)
backoff_cleanup_ticks = 1;
unsubscribe_backoff;
}
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Peer unsubscribes then subscribes from topic. *)
let state, _ = GS.leave {topic} state in
let state, _ = GS.join {topic} state in
(* No graft should be emitted until [(backoff_ticks + 2) * heartbeat_interval] elapse.
The additional 2 [heartbeat_interval] is due to the "backoff slack". *)
let state =
List.init ~when_negative_length:() (backoff_ticks + 1) (fun i -> i + 1)
|> WithExceptions.Result.get_ok ~loc:__LOC__
|> List.fold_left
(fun state i ->
Time.elapse @@ heartbeat_interval ;
Log.info "%d time tick(s) elapsed..." i ;
let state, Heartbeat {to_graft; _} = GS.heartbeat state in
let grafts = Peer.Map.bindings to_graft in
Check.(
(List.length grafts = 0)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
state)
state
in
(* After elapsing one more [heartbeat_interval],
the backoff should be cleared and the graft should be emitted. *)
Time.elapse @@ heartbeat_interval ;
let _state, Heartbeat {to_graft; _} = GS.heartbeat state in
let grafts = Peer.Map.bindings to_graft in
Check.((List.length grafts = 1) int ~error_msg:"Expected %R, got %L" ~__LOC__) ;
unit
(* Tests that only grafts for outbound peers are accepted when the mesh is full.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L2254
*)
let test_accept_only_outbound_peer_grafts_when_mesh_full rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Accept only outbound peer grafts when mesh full"
~tags:["gossipsub"; "graft"; "outbound"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:limits.degree_high in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Graft all the peers. This should fill the mesh. *)
let state =
List.fold_left
(fun state peer ->
let state, _ = GS.handle_graft {peer; topic} state in
state)
state
peers
in
(* Assert that the mesh is full. *)
assert_mesh_size ~__LOC__ ~topic ~expected_size:limits.degree_high state ;
(* Add an outbound peer and an inbound peer. *)
let inbound_peer = 99 in
let outbound_peer = 98 in
let state, _ =
GS.add_peer {direct = false; outbound = false; peer = inbound_peer} state
in
let state, _ =
GS.add_peer {direct = false; outbound = true; peer = outbound_peer} state
in
(* Send grafts. *)
let state, _ = GS.handle_graft {peer = inbound_peer; topic} state in
let state, _ = GS.handle_graft {peer = outbound_peer; topic} state in
(* Assert that only the outbound has been added to the mesh *)
assert_mesh_inclusion
~__LOC__
~topic
~peer:inbound_peer
~is_included:false
state ;
assert_mesh_inclusion
~__LOC__
~topic
~peer:outbound_peer
~is_included:true
state ;
unit
(* Tests that the number of kept outbound peers is at least [degree_out]
when removing peers from mesh in heartbeat.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L2291
*)
let test_do_not_remove_too_many_outbound_peers rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Do not remove too many outbound peers"
~tags:["gossipsub"; "heartbeat"; "outbound"]
@@ fun () ->
let topic = "topic" in
(* Create [degree_high] inbound peers and [degree_out] outbound peers. *)
let inbound_peers, outbound_peers =
make_peers ~number:(limits.degree_high + limits.degree_out)
|> List.split_n limits.degree_high
in
(* Initiate the state with inbound peers. *)
let state =
init_state
~rng
~limits
~parameters
~peers:inbound_peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
~outbound:(fun _ -> false)
()
in
(* Graft all the inbound peers.
This works because the number of inbound peers is equal to [degree_high]. *)
let state =
List.fold_left
(fun state peer ->
let state, _ = GS.handle_graft {peer; topic} state in
state)
state
inbound_peers
in
(* Connect to all [degree_out] outbound peers. The grafts will be accepted since
outbound connections are accepted even when the mesh is full. *)
let state =
add_and_subscribe_peers
[topic]
outbound_peers
~to_subscribe:(fun _ -> true)
~outbound:(fun _ -> true)
state
in
let state =
List.fold_left
(fun state peer ->
let state, _ = GS.handle_graft {peer; topic} state in
state)
state
outbound_peers
in
(* At this point the mesh should be overly full.
It has [degree_high + degree_out] peers where the upper limit is [degree_high]. *)
assert_mesh_size
~__LOC__
~topic
~expected_size:(limits.degree_high + limits.degree_out)
state ;
(* Run heartbeat. *)
let _state, Heartbeat {to_prune; _} = GS.heartbeat state in
(* There should be enough prune requests to bring back the mesh size to [degree_optimal]. *)
let peers_to_prune =
to_prune |> Peer.Map.bindings |> List.map (fun (peer, _topics) -> peer)
in
Check.(
(List.length peers_to_prune
= limits.degree_high + limits.degree_out - limits.degree_optimal)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
(* No outbound peer should have been pruned since pruning any of them would
bring the number of outbound peers to below [degree_out]. *)
List.iter
(fun peer ->
(* Outbound peer should continue to be in mesh. *)
assert_mesh_inclusion ~__LOC__ ~topic ~peer state ~is_included:true ;
(* Should be no prune request for the outbound peer. *)
if List.mem ~equal:Peer.equal peer peers_to_prune then
Test.fail ~__LOC__ "Outbound peer should not be pruned."
else ())
outbound_peers ;
unit
(* Tests that outbound peers are added to the mesh
if the number of outbound peers is below [degree_out].
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L2338
*)
let test_add_outbound_peers_if_min_is_not_satisfied rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Add outbound peers if min is not satisfied"
~tags:["gossipsub"; "heartbeat"; "outbound"]
@@ fun () ->
let topic = "topic" in
let inbound_peers, outbound_peers =
make_peers ~number:(limits.degree_high + limits.degree_out)
|> List.split_n limits.degree_high
in
let state =
init_state
~rng
~limits
~parameters
~peers:inbound_peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
~outbound:(fun _ -> false)
()
in
(* Graft all the inbound peers.
This works because the number of inbound peers is equal to [degree_high]. *)
let state =
List.fold_left
(fun state peer ->
let state, _ = GS.handle_graft {peer; topic} state in
state)
state
inbound_peers
in
(* Create [degree_out] outbound connections without grafting. *)
let state =
add_and_subscribe_peers
[topic]
outbound_peers
~to_subscribe:(fun _ -> true)
~outbound:(fun _ -> true)
state
in
(* At this point the mesh is filled with [degree_high] inbound peers. *)
assert_mesh_size ~__LOC__ ~topic ~expected_size:limits.degree_high state ;
(* Heartbeat. *)
let state, Heartbeat {to_prune; to_graft; _} = GS.heartbeat state in
(* The outbound peers should have been additionally added. *)
assert_mesh_size
~__LOC__
~topic
~expected_size:(limits.degree_high + limits.degree_out)
state ;
let peers_to_prune =
to_prune |> Peer.Map.bindings |> List.map (fun (peer, _topics) -> peer)
in
let peers_to_graft =
to_graft |> Peer.Map.bindings |> List.map (fun (peer, _topics) -> peer)
in
Check.(
(List.length peers_to_prune = 0)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
Check.(
(List.length peers_to_graft = limits.degree_out)
int
~error_msg:"Expected %R, got %L"
~__LOC__) ;
unit
(* Tests that the correct message is returned when a peer asks for a message in our cache.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1025
*)
let test_handle_iwant_msg_cached rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle IWant for message in cache"
~tags:["gossipsub"; "iwant"; "cache"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
let message = "some message" in
let message_id = 3 in
(* Place message in cache by publishing. *)
let state, _ = GS.publish_message {topic; message; message_id} state in
(* Send IWant. *)
let _state, output =
GS.handle_iwant {peer; message_ids = [message_id]} state
in
let routed_message_ids = assert_iwant_output_success ~__LOC__ output in
(* IWant should return the message in cache. *)
match Message_id.Map.find message_id routed_message_ids with
| None | Some `Ignored | Some `Not_found | Some `Too_many_requests ->
Test.fail ~__LOC__ "Expected IWant to return the message in cache."
| Some (`Message msg) ->
Check.((msg = message) string ~error_msg:"Expected %R, got %L" ~__LOC__) ;
unit
(* Tests that in IWant stops returning message after
[history_length] heartbeats as it is shifted out from the cache.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1081
*)
let test_handle_iwant_msg_cached_shifted rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle IWant message cache shifted"
~tags:["gossipsub"; "iwant"; "cache"; "heartbeat"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits:{limits with max_gossip_retransmission = 100}
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
let message = "some message" in
let message_id = 3 in
(* Place message in cache by publishing. *)
let state, _ = GS.publish_message {topic; message; message_id} state in
(* Loop [2 * limits.history_length] times and check that IWant starts returning
`Not_found after [history_length] heartbeat ticks. *)
let _state =
List.init ~when_negative_length:() (2 * limits.history_length) (fun i ->
i + 1)
|> WithExceptions.Result.get_ok ~loc:__LOC__
|> List.fold_left
(fun state heartbeat_count ->
(* Heartbeat. *)
let state, _output = GS.heartbeat state in
(* Send IWant. *)
let state, output =
GS.handle_iwant {peer; message_ids = [message_id]} state
in
let routed_message_ids =
assert_iwant_output_success ~__LOC__ output
in
match Message_id.Map.find message_id routed_message_ids with
| None | Some `Ignored | Some `Too_many_requests ->
Test.fail ~__LOC__ "Expected `Message or `Not_found."
| Some (`Message _msg) ->
if heartbeat_count < limits.history_length then
(* The expected case *)
state
else
Test.fail
~__LOC__
"Expected IWant to not return the message in cache at \
heartbeat count %d."
heartbeat_count
| Some `Not_found ->
if heartbeat_count >= limits.history_length then
(* The expected case *)
state
else
Test.fail
~__LOC__
"Expected IWant to not return `Not_found at heartbeat count \
%d."
heartbeat_count)
state
in
unit
(* Tests that we do not return a message when a peers asks for a message not in our cache.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1146
*)
let test_handle_iwant_msg_not_cached rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Test handle IWant for message not in cache"
~tags:["gossipsub"; "iwant"; "cache"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
(* Some random message id. *)
let message_id = 99 in
(* Send IWant. *)
let _state, output =
GS.handle_iwant {peer; message_ids = [message_id]} state
in
let routed_message_ids = assert_iwant_output_success ~__LOC__ output in
(* IWant should return `Not_found as the message is not in cache. *)
match Message_id.Map.find message_id routed_message_ids with
| None | Some `Ignored | Some (`Message _) | Some `Too_many_requests ->
Test.fail ~__LOC__ "Expected IWant to return `Not_found."
| Some `Not_found -> unit
(* Tests that receiving too many IWants from the same peer for the same message
results in ignoring the request.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4387
*)
let test_ignore_too_many_iwants_from_same_peer_for_same_message rng limits
parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Ignore too many IWants from same peer for same message"
~tags:["gossipsub"; "iwant"; "cache"]
@@ fun () ->
let topic = "topic" in
(* Create state with an empty mesh. *)
let state =
init_state
~rng
~limits
~parameters
~peers:[]
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Add a peer that is not in the mesh. *)
let peer = 99 in
let state =
add_and_subscribe_peers [topic] [peer] ~to_subscribe:(fun _ -> true) state
in
(* Add message to cache by publishing. *)
let message = "some message" in
let message_id = 0 in
let state, _ = GS.publish_message {topic; message; message_id} state in
(* Send IWant from same peer for same message [(2 * limits.max_gossip_retransmission) + 10] times.
Only the first [max_gossip_retransmission] IWant requests should be accepted. *)
let _state =
List.init
~when_negative_length:()
((2 * limits.max_gossip_retransmission) + 10)
(fun i -> i + 1)
|> WithExceptions.Result.get_ok ~loc:__LOC__
|> List.fold_left
(fun state iwant_count ->
let state, output =
GS.handle_iwant {peer; message_ids = [message_id]} state
in
let routed_message_ids =
assert_iwant_output_success ~__LOC__ output
in
match Message_id.Map.find message_id routed_message_ids with
| None | Some `Not_found | Some `Ignored ->
Test.fail
~__LOC__
"IWant should be either accepted or ignored due to too many \
requests."
| Some (`Message _) ->
if iwant_count <= limits.max_gossip_retransmission then
(* The expected case. *)
state
else
Test.fail
~__LOC__
"IWant should be ignored at iwant count of %d."
iwant_count
| Some `Too_many_requests ->
if iwant_count > limits.max_gossip_retransmission then
(* The expected case. *)
state
else
Test.fail
~__LOC__
"IWant should be accepted at iwant count of %d."
iwant_count)
state
in
unit
(* Tests that handling an IHave message for a subscribed topic that has not been seen
results in requesting the message.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1165
*)
let test_handle_ihave_subscribed_and_msg_not_seen rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Handle IHave for subscribed and not seen."
~tags:["gossipsub"; "ihave"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
(* Some unknown message id. *)
let message_id = 99 in
let _state, output =
GS.handle_ihave {peer; topic; message_ids = [message_id]} state
in
(* IHave should request the message id. *)
match output with
| Message_requested_message_ids message_ids ->
Check.(
(message_ids = [message_id])
(list int)
~error_msg:"Expected %R, got %L"
~__LOC__) ;
unit
| Ihave_from_peer_with_low_score _ | Too_many_recv_ihave_messages _
| Too_many_sent_iwant_messages _ | Message_topic_not_tracked
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected to request message."
(* Tests that handling an IHave message for a subscribed topic that has been seen
does not result in requesting the message.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1197
*)
let test_handle_ihave_subscribed_and_msg_seen rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Handle IHave for subscribed and seen."
~tags:["gossipsub"; "ihave"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
(* Publish to mark message as seen. *)
let message = "some message" in
let message_id = 0 in
let state, _ = GS.publish_message {topic; message_id; message} state in
(* Handle IHave for the seen message. *)
let _state, output =
GS.handle_ihave {peer; topic; message_ids = [message_id]} state
in
(* IHave should not request any messages. *)
match output with
| Ihave_from_peer_with_low_score _ | Too_many_recv_ihave_messages _
| Too_many_sent_iwant_messages _ | Message_topic_not_tracked
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected Message_requested_message_ids."
| Message_requested_message_ids message_ids ->
Check.(
(message_ids = []) (list int) ~error_msg:"Expected %R, got %L" ~__LOC__) ;
unit
(* Tests that handling an IHave message for an unsubscribed topic is ignored.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L1219
*)
let test_handle_ihave_not_subscribed rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Handle IHave for unsubscribed."
~tags:["gossipsub"; "ihave"]
@@ fun () ->
let peers = make_peers ~number:(many_peers limits) in
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let peer = peers.(7) in
let topic = "some random topic" in
let message_id = 0 in
(* Handle IHave for an unsubscribed topic. *)
let _state, output =
GS.handle_ihave {peer; topic; message_ids = [message_id]} state
in
(* IHave should result in [Message_topic_not_tracked]. *)
match output with
| Ihave_from_peer_with_low_score _ | Too_many_recv_ihave_messages _
| Too_many_sent_iwant_messages _ | Message_requested_message_ids _
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected Message_requested_message_ids."
| Message_topic_not_tracked -> unit
(* Tests that we start ignoring IHaves after receiving more than
[max_recv_ihave_per_heartbeat] IHaves per heartbeat.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4439
*)
let test_ignore_too_many_ihaves rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Ignore too many IHaves."
~tags:["gossipsub"; "ihave"]
@@ fun () ->
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let max_recv_ihave_per_heartbeat = 10 in
let state =
init_state
~rng
~limits:{limits with max_recv_ihave_per_heartbeat}
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Add a peer that is not in the mesh. *)
let peer = 99 in
let state =
add_and_subscribe_peers [topic] [peer] ~to_subscribe:(fun _ -> true) state
in
(* [2 * max_recv_ihave_per_heartbeat] message ids. *)
let message_ids =
List.init
~when_negative_length:()
((2 * max_recv_ihave_per_heartbeat) + 1)
(fun i -> i)
|> WithExceptions.Result.get_ok ~loc:__LOC__
in
(* Peers sends us [2 * max_recv_ihave_per_heartbeat + 1] IHaves.
The IHave should start being ignored with [Too_many_recv_ihave_messages]
after [max_recv_ihave_per_heartbeat] *)
let state =
message_ids
|> List.fold_left_i
(fun i state message_id ->
let state, output =
GS.handle_ihave {peer; topic; message_ids = [message_id]} state
in
let ihave_count = i + 1 in
match output with
| Ihave_from_peer_with_low_score _ | Too_many_sent_iwant_messages _
| Message_topic_not_tracked | Invalid_message_id ->
Test.fail
~__LOC__
"Expected [Too_many_recv_ihave_messages] or \
[Message_requested_message_ids]."
| Message_requested_message_ids _ ->
if ihave_count <= max_recv_ihave_per_heartbeat then
(* Expected case. *)
state
else
Test.fail
~__LOC__
"Expected [Too_many_recv_ihave_messages] at IHave count %d."
ihave_count
| Too_many_recv_ihave_messages _ ->
if ihave_count > max_recv_ihave_per_heartbeat then
(* Expected case. *)
state
else
Test.fail
~__LOC__
"Expected [Message_requested_message_ids] at IHave count %d."
ihave_count)
state
in
(* After heartbeat the IHave count should have been reset. *)
let state, _ = GS.heartbeat state in
(* Take [max_recv_ihave_per_heartbeat] message_ids from the second half of the [message_ids]. *)
let second_half_ids =
List.split_n max_recv_ihave_per_heartbeat message_ids
|> fun (_, second_half_ids) ->
List.take_n max_recv_ihave_per_heartbeat second_half_ids
in
(* Resend IHaves for the message ids that were previously ignored with [Too_many_recv_ihave_messages].
All the IHaves should result in requesting the message. *)
let _state =
second_half_ids
|> List.fold_left
(fun state message_id ->
let state, output =
GS.handle_ihave {peer; topic; message_ids = [message_id]} state
in
match output with
| Ihave_from_peer_with_low_score _ | Too_many_sent_iwant_messages _
| Message_topic_not_tracked | Too_many_recv_ihave_messages _
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected [Message_requested_message_ids]."
| Message_requested_message_ids _ ->
(* Expected case *)
state)
state
in
unit
(* Tests that only up-to [max_sent_iwant_per_heartbeat] messages are requested per heartbeat per peer.
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4513
*)
let test_ignore_too_many_messages_in_ihave rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Ignore too many messages in IHave."
~tags:["gossipsub"; "ihave"]
@@ fun () ->
let assert_requested_message_ids output ~check =
match output with
| GS.Message_requested_message_ids message_ids ->
check message_ids ;
()
| Ihave_from_peer_with_low_score _ | Too_many_recv_ihave_messages _
| Too_many_sent_iwant_messages _ | Message_topic_not_tracked
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected to request messages."
in
let check_message_ids_length ~__LOC__ ids ~expected =
Check.(
(List.length ids = expected) int ~error_msg:"Expected %R, got %L" ~__LOC__)
in
let check_subset_message_ids ~__LOC__ ids1 ids2 =
(* Check that the elements in [ids1] is a subset of [ids2] *)
if
Message_id.Set.subset
(Message_id.Set.of_list ids1)
(Message_id.Set.of_list ids2)
then ()
else Test.fail ~__LOC__ "Subset check failed."
in
let topic = "topic" in
let peers = make_peers ~number:(many_peers limits) in
let max_sent_iwant_per_heartbeat = 10 in
let state =
init_state
~rng
~limits:
{
limits with
max_sent_iwant_per_heartbeat;
(* We set [max_recv_ihave_per_heartbeat] high so the IHaves only get ignored
due to requesting too many messsages and not due to the number of IHaves. *)
max_recv_ihave_per_heartbeat = 999;
}
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* Add a peer that is not in the mesh. *)
let peer = 99 in
let state =
add_and_subscribe_peers [topic] [peer] ~to_subscribe:(fun _ -> true) state
in
(* 20 message ids. *)
let message_ids =
List.init
~when_negative_length:()
(2 * max_sent_iwant_per_heartbeat)
(fun i -> i)
|> WithExceptions.Result.get_ok ~loc:__LOC__
in
(* Send IHave with message ids 0..7 *)
let first_8 = List.take_n 8 message_ids in
let state, output =
GS.handle_ihave {peer; topic; message_ids = first_8} state
in
(* All messages should be requested. *)
assert_requested_message_ids output ~check:(fun ids ->
Check.(
(List.sort Message_id.compare ids = List.sort Message_id.compare first_8)
(list int)
~error_msg:"Expected %R, got %L"
~__LOC__)) ;
(* Send IHave with message ids 0..11 *)
let first_12 = List.take_n 12 message_ids in
let state, output =
GS.handle_ihave {peer; topic; message_ids = first_12} state
in
(* Since [8 + 2 >= max_sent_iwant_per_heartbeat], only 2 messages should be requested. *)
assert_requested_message_ids output ~check:(fun ids ->
check_message_ids_length ~__LOC__ ids ~expected:2 ;
check_subset_message_ids ~__LOC__ ids first_12) ;
(* Send IHave with message ids 0..19 *)
let _state, output = GS.handle_ihave {peer; topic; message_ids} state in
let () =
(* The number of messages requested has already exceeded [max_sent_iwant_per_heartbeat]
so the IHave should return [Too_many_sent_iwant_messages]. *)
match output with
| Too_many_sent_iwant_messages _ ->
(* Expected case. *)
()
| Message_requested_message_ids _ | Ihave_from_peer_with_low_score _
| Too_many_recv_ihave_messages _ | Message_topic_not_tracked
| Invalid_message_id ->
Test.fail ~__LOC__ "Expected [Too_many_sent_iwant_messages]."
in
(* After heartbeat the count should have been reset. *)
let state, _ = GS.heartbeat state in
(* IHave should result in requesting the remaining 10 messages. *)
let _state, output = GS.handle_ihave {peer; topic; message_ids} state in
assert_requested_message_ids output ~check:(fun ids ->
check_message_ids_length ~__LOC__ ids ~expected:10 ;
check_subset_message_ids ~__LOC__ ids message_ids) ;
unit
(* Check the following scenario:
* we joined a topic and we have a unique peer in the mesh
* the peer has a negative score
* the heartbeat removes that peer
* a new peer subscribes to the same topic
* a new heartbeat adds to the new peer to the mesh
*)
let test_heartbeat_scenario rng limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: simple heartbeat scenario"
~tags:["gossipsub"; "heartbeat"]
@@ fun () ->
let peer_topics_map_to_list map =
map |> Peer.Map.bindings
|> List.map (fun (peer, topics) -> (peer, Topic.Set.elements topics))
in
let topic = "topic" in
let peer = 0 in
let s = GS.make rng limits parameters in
let s, output = GS.add_peer {direct = false; outbound = false; peer} s in
assert_output ~__LOC__ output Peer_added ;
let s, output = GS.handle_subscribe {topic; peer} s in
assert_output ~__LOC__ output Subscribed ;
let s, output = GS.join {topic} s in
(match output with
| Joining_topic {to_graft} when Peer.Set.elements to_graft = [peer] -> ()
| _ -> Test.fail ~__LOC__ "Unexpected Join output.") ;
assert_mesh_size ~__LOC__ ~topic ~expected_size:1 s ;
let s, GS.Set_application_score =
GS.set_application_score ({peer; score = -1.0} : GS.set_application_score) s
in
let s, Heartbeat {to_prune; _} = GS.heartbeat s in
Check.(
(peer_topics_map_to_list to_prune = [(peer, [topic])])
(list (tuple2 int (list string)))
~error_msg:"Expected %R, got %L"
~__LOC__) ;
assert_mesh_size ~__LOC__ ~topic ~expected_size:0 s ;
let peer = 1 in
let s, output = GS.add_peer {direct = false; outbound = false; peer} s in
assert_output ~__LOC__ output Peer_added ;
let s, output = GS.handle_subscribe {peer; topic} s in
assert_output ~__LOC__ output Subscribed ;
let s, Heartbeat {to_graft; _} = GS.heartbeat s in
Check.(
(peer_topics_map_to_list to_graft = [(peer, [topic])])
(list (tuple2 int (list string)))
~error_msg:"Expected %R, got %L"
~__LOC__) ;
assert_mesh_size ~__LOC__ ~topic ~expected_size:1 s ;
unit
(** Test for P1 (Time in Mesh).
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3166 *)
let test_scoring_p1 rng _limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Scoring P1"
~tags:["gossipsub"; "scoring"; "p1"]
@@ fun () ->
let time_in_mesh_quantum = Milliseconds.of_int_ms 50 in
let time_in_mesh_weight = 2.0 in
let time_in_mesh_cap = 10.0 in
let limits =
Default_limits.default_limits
~time_in_mesh_weight
~time_in_mesh_quantum
~time_in_mesh_cap
()
in
let peers = make_peers ~number:1 in
let peer = Stdlib.List.hd peers in
let topic = "topic" in
(* Build mesh with one peer. *)
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* sleep for 2 times the mesh_quantum *)
Time.elapse GS.Span.(mul time_in_mesh_quantum 2) ;
(* refresh scores *)
let state, _ = GS.heartbeat state in
(* score should be 2 * time_in_mesh_weight *)
assert_peer_score
~__LOC__
~expected_score:(time_in_mesh_weight *. 2.0)
peer
state ;
(* sleep again for 2 times the mesh_quantum *)
Time.elapse GS.Span.(mul time_in_mesh_quantum 2) ;
(* refresh scores *)
let state, _ = GS.heartbeat state in
(* score should be 4 * time_in_mesh_weight *)
assert_peer_score
~__LOC__
~expected_score:(time_in_mesh_weight *. 4.0)
peer
state ;
(* sleep for enough periods to reach maximum *)
Time.elapse
GS.Span.(
mul time_in_mesh_quantum (Float.to_int @@ (time_in_mesh_cap +. 10.0))) ;
(* refresh scores *)
let state, _ = GS.heartbeat state in
(* score should be exactly time_in_mesh_cap * time_in_mesh_weight *)
assert_peer_score
~__LOC__
~expected_score:(time_in_mesh_weight *. time_in_mesh_cap)
peer
state ;
unit
(** Test for P2 (First Message Deliveries).
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3247 *)
let test_scoring_p2 rng _limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Scoring P2"
~tags:["gossipsub"; "scoring"; "p2"]
@@ fun () ->
let first_message_deliveries_weight = 2.0 in
let first_message_deliveries_cap = 10 in
let first_message_deliveries_decay = 0.9 in
let limits =
Default_limits.default_limits
~time_in_mesh_weight:0.0 (* deactivate time in mesh *)
~first_message_deliveries_weight
~first_message_deliveries_cap
~first_message_deliveries_decay
()
in
let peers = make_peers ~number:2 in
let topic = "topic" in
(* Build mesh with one peer. *)
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let peers = Array.of_list peers in
let receive_message ~__LOC__ peer message_id message state =
let state, output =
GS.handle_receive_message
{sender = peer; topic; message_id; message}
state
in
match output with
| GS.Route_message _ | Already_received -> state
| _ ->
Test.fail
~__LOC__
"Output should have been Route_message or Already_received"
in
(* peer 0 delivers message first *)
let message_id, message = gen_message () in
let state = receive_message ~__LOC__ peers.(0) message_id message state in
(* peer 1 delivers message second *)
let state = receive_message ~__LOC__ peers.(1) message_id message state in
(* score for peer 0 should be exactly first_message_deliveries_weight *)
assert_peer_score
~__LOC__
~expected_score:first_message_deliveries_weight
peers.(0)
state ;
(* score for peer 1 should stay at 0. *)
assert_peer_score ~__LOC__ ~expected_score:0.0 peers.(1) state ;
(* peer 1 delivers two new messages *)
let message_id, message = gen_message () in
let state = receive_message ~__LOC__ peers.(1) message_id message state in
let message_id, message = gen_message () in
let state = receive_message ~__LOC__ peers.(1) message_id message state in
(* Score for peer1 should be exactly 2 * first_message_deliveries_weight *)
assert_peer_score
~__LOC__
~expected_score:(2.0 *. first_message_deliveries_weight)
peers.(1)
state ;
(* test decaying *)
(* refresh scores *)
let state, _ = GS.heartbeat state in
(* score of peer 0 should be exactly
first_message_deliveries_decay * first_message_deliveries_weight *)
assert_peer_score
~__LOC__
~expected_score:
(first_message_deliveries_decay *. first_message_deliveries_weight)
peers.(0)
state ;
(* score of peer 1 should be exactly
2 * first_message_deliveries_decay * first_message_deliveries_weight *)
assert_peer_score
~__LOC__
~expected_score:
(2. *. first_message_deliveries_decay *. first_message_deliveries_weight)
peers.(1)
state ;
(* test cap *)
(* peer 1 delivers first_message_deliveries_cap more messages *)
let state =
Stdlib.List.init first_message_deliveries_cap (fun _i -> gen_message ())
|> List.fold_left
(fun state (message_id, message) ->
receive_message ~__LOC__ peers.(1) message_id message state)
state
in
(* score of peer 1 should be exactly
first_message_deliveries_cap * first_message_deliveries_weight *)
assert_peer_score
~__LOC__
~expected_score:
(float_of_int first_message_deliveries_cap
*. first_message_deliveries_weight)
peers.(1)
state ;
unit
(** Test for P4 (Invalid Messages).
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3895
and https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3979 *)
let test_scoring_p4 rng _limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Scoring P4"
~tags:["gossipsub"; "scoring"; "p4"]
@@ fun () ->
let invalid_message_deliveries_weight = -2.0 in
let invalid_message_deliveries_decay = 0.9 in
let limits =
Default_limits.default_limits
~time_in_mesh_weight:0.0 (* deactivate time in mesh *)
~first_message_deliveries_weight:0.0
(* deactivate first time deliveries *)
~mesh_message_deliveries_weight:0.0 (* deactivate message deliveries *)
~mesh_failure_penalty_weight:0.0 (* deactivate mesh failure penalties *)
~invalid_message_deliveries_weight
~invalid_message_deliveries_decay
()
in
let peers = make_peers ~number:1 in
let peer = Stdlib.List.hd peers in
let topic = "topic" in
(* build mesh with one peer *)
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
(* We remember the old validity function so we can recover it later.
We must recover the validity function as it is shared between the tests. *)
let validity_function_before = !Validity_hook.validity in
(* set validity function so all messages are invalid *)
Validity_hook.set (fun _ _ -> `Invalid) ;
(* peer sends us three invalid messages *)
let state =
Stdlib.List.init 3 (fun _i -> ())
|> List.fold_left
(fun state () ->
let message_id, message = gen_message () in
let state, _ =
GS.handle_receive_message
{sender = peer; topic; message_id; message}
state
in
state)
state
in
(* score should be
invalid_message_deliveries_weight * (number of invalid messages)^2 *)
assert_peer_score
~__LOC__
~expected_score:(invalid_message_deliveries_weight *. 9.0)
peer
state ;
(* test decaying *)
(* refresh scores *)
let state, _ = GS.heartbeat state in
(* the number of invalids gets decayed by 0.9 and then squared in the score *)
assert_peer_score
~__LOC__
~expected_score:
(invalid_message_deliveries_weight *. (3.0 *. 0.9) *. (3.0 *. 0.9))
peer
state ;
Validity_hook.set validity_function_before ;
unit
(** Test for P5 (Application-Specific).
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4049 *)
let test_scoring_p5 rng _limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Scoring P5"
~tags:["gossipsub"; "scoring"; "p5"]
@@ fun () ->
let app_specific_weight = 2.0 in
let limits = Default_limits.default_limits ~app_specific_weight () in
let peers = make_peers ~number:1 in
let peer = Stdlib.List.hd peers in
let topic = "topic" in
(* build mesh with one peer *)
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> true)
()
in
let state, _ = GS.set_application_score {peer; score = 1.1} state in
assert_peer_score
~__LOC__
~expected_score:(app_specific_weight *. 1.1)
peer
state ;
unit
(** Test for P7 (Behavioural Penalty - Grafts before backoff).
Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L4203 *)
let test_scoring_p7_grafts_before_backoff rng _limits parameters =
Tezt_core.Test.register
~__FILE__
~title:"Gossipsub: Scoring P7 grafts before backoff"
~tags:["gossipsub"; "scoring"; "p7"; "graft"]
@@ fun () ->
let prune_backoff = Milliseconds.of_int_ms 200 in
let graft_flood_threshold = Milliseconds.of_int_ms 100 in
let behaviour_penalty_weight = -2.0 in
let behaviour_penalty_decay = 0.9 in
let limits =
{
(Default_limits.default_limits
~behaviour_penalty_weight
~behaviour_penalty_decay
())
with
prune_backoff;
graft_flood_threshold;
}
in
let peers = make_peers ~number:2 in
let topic = "topic" in
(* Build mesh with two peers. *)
let state =
init_state
~rng
~limits
~parameters
~peers
~topics:[topic]
~to_subscribe:(fun _ -> false)
()
in
let peers = Array.of_list peers in
let add_prune_backoff peer state =
(* Introduce prune backoff to [peer] by setting the [peer] score to negative and
handling a graft from the [peer]. *)
let state, _ = GS.set_application_score {peer; score = -99.0} state in
let state, _ = GS.handle_graft {peer; topic} state in
let state, _ = GS.set_application_score {peer; score = 0.0} state in
state
in
(* Add prune backoff to the peers. *)
let state = add_prune_backoff peers.(0) state in
let state = add_prune_backoff peers.(1) state in
(* Wait [graft_flood_threshold - 1] ms. *)
Time.elapse Milliseconds.(sub graft_flood_threshold (of_int_ms 1)) ;
(* First peer tries to graft. *)
let state, _ = GS.handle_graft {peer = peers.(0); topic} state in
(* Since the time since last prune is below [graft_flood_threshold],
the first peer should have double behaviour penalty (squared). *)
assert_peer_score
~__LOC__
~expected_score:(2.0 *. 2.0 *. behaviour_penalty_weight)
peers.(0)
state ;
(* Wait for 2 more ms millisecs.
Total time passed since last prune is now [graft_flood_threshold + 1] *)
Time.elapse @@ Milliseconds.of_int_ms 2 ;
(* Second peer tries to graft. *)
let state, _ = GS.handle_graft {peer = peers.(1); topic} state in
(* Since the time since last prune is above [graft_flood_threshold],
the second peer should have a single behaviour penalty (squared). *)
assert_peer_score
~__LOC__
~expected_score:(1.0 *. 1.0 *. behaviour_penalty_weight)
peers.(1)
state ;
(* Test decay. *)
let state, _ = GS.heartbeat state in
assert_peer_score
~__LOC__
~expected_score:
(2.0 *. behaviour_penalty_decay
*. (2.0 *. behaviour_penalty_decay)
*. behaviour_penalty_weight)
peers.(0)
state ;
assert_peer_score
~__LOC__
~expected_score:
(1.0 *. behaviour_penalty_decay
*. (1.0 *. behaviour_penalty_decay)
*. behaviour_penalty_weight)
peers.(1)
state ;
unit
(* TODO: https://gitlab.com/tezos/tezos/-/issues/5293
Add test the described test scenario *)
let register rng limits parameters =
test_ignore_graft_from_unknown_topic rng limits parameters ;
test_handle_received_subscriptions rng limits parameters ;
test_join_adds_peers_to_mesh rng limits parameters ;
test_join_adds_fanout_to_mesh rng limits parameters ;
test_publish_without_flood_publishing rng limits parameters ;
test_receiving_message_for_unsusbcribed_topic rng limits parameters ;
test_receiving_message rng limits parameters ;
test_fanout rng limits parameters ;
test_handle_graft_for_joined_topic rng limits parameters ;
test_handle_graft_for_not_joined_topic rng limits parameters ;
test_handle_graft_from_unsubscribed_peer rng limits parameters ;
test_handle_prune_peer_in_mesh rng limits parameters ;
test_mesh_addition rng limits parameters ;
test_mesh_subtraction rng limits parameters ;
test_do_not_graft_within_backoff_period rng limits parameters ;
test_unsubscribe_backoff rng limits parameters ;
test_accept_only_outbound_peer_grafts_when_mesh_full rng limits parameters ;
test_do_not_remove_too_many_outbound_peers rng limits parameters ;
test_add_outbound_peers_if_min_is_not_satisfied rng limits parameters ;
test_handle_iwant_msg_cached rng limits parameters ;
test_handle_iwant_msg_cached_shifted rng limits parameters ;
test_handle_iwant_msg_not_cached rng limits parameters ;
test_ignore_too_many_iwants_from_same_peer_for_same_message
rng
limits
parameters ;
test_handle_ihave_subscribed_and_msg_not_seen rng limits parameters ;
test_handle_ihave_subscribed_and_msg_seen rng limits parameters ;
test_handle_ihave_not_subscribed rng limits parameters ;
test_ignore_too_many_ihaves rng limits parameters ;
test_ignore_too_many_messages_in_ihave rng limits parameters ;
test_heartbeat_scenario rng limits parameters ;
test_scoring_p1 rng limits parameters ;
test_scoring_p2 rng limits parameters ;
test_scoring_p4 rng limits parameters ;
test_scoring_p5 rng limits parameters ;
test_scoring_p7_grafts_before_backoff rng limits parameters