https://gitlab.com/tezos/tezos
Tip revision: e5df8d3840ea4022271fa14f27645aa6c7bc9dd3 authored by Alain Mebsout on 15 March 2024, 16:39:29 UTC
Doc: changelog
Doc: changelog
Tip revision: e5df8d3
p2p.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019-2022 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
module Events = P2p_events.P2p
type config = {
listening_port : P2p_addr.port option;
listening_addr : P2p_addr.t option;
advertised_port : P2p_addr.port option;
discovery_port : P2p_addr.port option;
discovery_addr : Ipaddr.V4.t option;
trusted_points : (P2p_point.Id.t * P2p_peer.Id.t option) list;
peers_file : string;
private_mode : bool;
identity : P2p_identity.t;
proof_of_work_target : Tezos_crypto.Crypto_box.pow_target;
trust_discovered_peers : bool;
reconnection_config : Point_reconnection_config.t;
disable_peer_discovery : bool;
}
let create_scheduler limits =
let open P2p_limits in
let max_upload_speed = Option.map (( * ) 1024) limits.max_upload_speed in
let max_download_speed = Option.map (( * ) 1024) limits.max_download_speed in
P2p_io_scheduler.create
~read_buffer_size:limits.read_buffer_size
?max_upload_speed
?max_download_speed
?read_queue_size:limits.read_queue_size
?write_queue_size:limits.write_queue_size
()
let create_connection_pool config limits meta_cfg log triggers =
let open P2p_limits in
let pool_cfg =
{
P2p_pool.identity = config.identity;
trusted_points = config.trusted_points;
peers_file = config.peers_file;
private_mode = config.private_mode;
max_known_points = limits.max_known_points;
max_known_peer_ids = limits.max_known_peer_ids;
peer_greylist_size = limits.peer_greylist_size;
ip_greylist_size_in_kilobytes = limits.ip_greylist_size_in_kilobytes;
ip_greylist_cleanup_delay = limits.ip_greylist_cleanup_delay;
}
in
P2p_pool.create pool_cfg meta_cfg ~log triggers
let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched
triggers log answerer =
let open P2p_limits in
let connect_handler_cfg =
{
P2p_connect_handler.identity = config.identity;
proof_of_work_target = config.proof_of_work_target;
listening_port = config.listening_port;
advertised_port = config.advertised_port;
private_mode = config.private_mode;
reconnection_config = config.reconnection_config;
min_connections = limits.min_connections;
max_connections = limits.max_connections;
max_incoming_connections = limits.max_incoming_connections;
connection_timeout = limits.connection_timeout;
authentication_timeout = limits.authentication_timeout;
incoming_app_message_queue_size = limits.incoming_app_message_queue_size;
incoming_message_queue_size = limits.incoming_message_queue_size;
outgoing_message_queue_size = limits.outgoing_message_queue_size;
binary_chunks_size = limits.binary_chunks_size;
disable_peer_discovery = config.disable_peer_discovery;
}
in
P2p_connect_handler.create
connect_handler_cfg
pool
msg_cfg
conn_meta_cfg
io_sched
triggers
~log
~answerer
let may_create_discovery_worker _limits config pool =
match
(config.listening_port, config.discovery_port, config.discovery_addr)
with
| Some listening_port, Some discovery_port, Some discovery_addr ->
Some
(P2p_discovery.create
pool
config.identity.peer_id
~listening_port
~discovery_port
~discovery_addr
~trust_discovered_peers:config.trust_discovered_peers)
| _, _, _ -> None
let create_maintenance_worker limits pool connect_handler config triggers log =
let open P2p_limits in
let open Lwt_syntax in
match limits.maintenance_idle_time with
| None -> return_none
| Some maintenance_idle_time ->
let maintenance_config =
{
P2p_maintenance.maintenance_idle_time;
private_mode = config.private_mode;
min_connections = limits.min_connections;
max_connections = limits.max_connections;
expected_connections = limits.expected_connections;
time_between_looking_for_peers =
Ptime.Span.of_int_s 5
(* Empirical value. Enough to observe changes in the network,
and not too long to discover new peers quickly. *)
(* TODO: https://gitlab.com/tezos/tezos/-/issues/1655
Check whether the value is optimal or not through integration tests
*);
}
in
let discovery = may_create_discovery_worker limits config pool in
return_some
(P2p_maintenance.create
?discovery
maintenance_config
pool
connect_handler
triggers
~log)
let may_create_welcome_worker config limits connect_handler =
config.listening_port
|> Option.map_es (fun port ->
P2p_welcome.create
~backlog:limits.P2p_limits.backlog
connect_handler
?addr:config.listening_addr
port)
type ('msg, 'peer_meta, 'conn_meta) connection =
('msg, 'peer_meta, 'conn_meta) P2p_conn.t
module Real = struct
type ('msg, 'peer_meta, 'conn_meta) net = {
config : config;
limits : P2p_limits.t;
io_sched : P2p_io_scheduler.t;
pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t;
connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t;
maintenance : ('msg, 'peer_meta, 'conn_meta) P2p_maintenance.t option;
welcome : P2p_welcome.t option;
watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
triggers : P2p_trigger.t;
received_msg_hook :
('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit;
sent_msg_hook : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit;
broadcasted_msg_hook :
('msg, 'peer_meta, 'conn_meta) connection P2p_peer.Table.t ->
?except:(('msg, 'peer_meta, 'conn_meta) connection -> bool) ->
?alt:(('msg, 'peer_meta, 'conn_meta) connection -> bool) * 'msg ->
'msg ->
unit;
}
let create ~config ~limits ?received_msg_hook ?sent_msg_hook
?broadcasted_msg_hook meta_cfg msg_cfg conn_meta_cfg =
let open Lwt_result_syntax in
let io_sched = create_scheduler limits in
let watcher = Lwt_watcher.create_input () in
let log event = Lwt_watcher.notify watcher event in
let triggers = P2p_trigger.create () in
let*! pool = create_connection_pool config limits meta_cfg log triggers in
(* There is a mutual recursion between an answerer and connect_handler,
for the default answerer. Because of the swap request mechanism, the
default answerer needs to initiate new connections using the
[P2p_connect_handler.connect] callback. *)
let rec answerer =
lazy
(if config.private_mode then P2p_protocol.create_private ()
else
let connect =
P2p_connect_handler.connect (Lazy.force connect_handler)
in
let proto_conf =
{
P2p_protocol.swap_linger = limits.P2p_limits.swap_linger;
pool;
log;
connect;
latest_accepted_swap = Ptime.epoch;
latest_successful_swap = Ptime.epoch;
}
in
P2p_protocol.create_default proto_conf)
and connect_handler =
lazy
(create_connect_handler
config
limits
pool
msg_cfg
conn_meta_cfg
io_sched
triggers
log
answerer)
in
let connect_handler = Lazy.force connect_handler in
let*! maintenance =
create_maintenance_worker limits pool connect_handler config triggers log
in
let* welcome = may_create_welcome_worker config limits connect_handler in
P2p_metrics_collectors.collect pool io_sched ;
return
{
config;
limits;
io_sched;
pool;
connect_handler;
maintenance;
welcome;
watcher;
triggers;
received_msg_hook =
Option.value ~default:(fun _ _ -> ()) received_msg_hook;
sent_msg_hook = Option.value ~default:(fun _ _ -> ()) sent_msg_hook;
broadcasted_msg_hook =
Option.value
~default:(fun _ ?except:_ ?alt:_ _ -> ())
broadcasted_msg_hook;
}
let peer_id {config; _} = config.identity.peer_id
let maintain {maintenance; _} () =
let open Lwt_result_syntax in
match maintenance with
| Some maintenance ->
let*! () = P2p_maintenance.maintain maintenance in
return_unit
| None -> tzfail P2p_errors.Maintenance_disabled
let activate t () =
Events.(emit__dont_wait__use_with_care activate_network)
t.config.identity.peer_id ;
(match t.welcome with None -> () | Some w -> P2p_welcome.activate w) ;
match t.maintenance with
| Some maintenance -> P2p_maintenance.activate maintenance
| None -> ()
(* TODO: https://gitlab.com/tezos/tezos/-/issues/4597
Implement [roll] function. *)
let roll _net () = Lwt.return_unit
(* returns when all workers have shut down in the opposite
creation order. *)
let shutdown net () =
let open Lwt_syntax in
let* () = Events.(emit shutdown_welcome_worker) () in
let* () = Option.iter_s P2p_welcome.shutdown net.welcome in
let* () = Events.(emit shutdown_maintenance_worker) () in
let* () =
Option.iter_s
(fun maintenance -> P2p_maintenance.shutdown maintenance)
net.maintenance
in
let* () = Events.(emit shutdown_connection_pool) () in
let* () = P2p_pool.destroy net.pool in
let* () = Events.(emit shutdown_connection_handler) () in
let* () = P2p_connect_handler.destroy net.connect_handler in
let* () = Events.(emit shutdown_scheduler) () in
P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched
let connections {pool; _} () =
P2p_pool.Connection.fold pool ~init:[] ~f:(fun _peer_id c acc -> c :: acc)
let find_connection_by_peer_id {pool; _} peer_id =
P2p_pool.Connection.find_by_peer_id pool peer_id
let find_connection_by_point {pool; _} point =
P2p_pool.Connection.find_by_point pool point
let disconnect ?wait ~reason = P2p_conn.disconnect ?wait ~reason:(User reason)
let connection_info _net conn = P2p_conn.info conn
let connection_local_metadata _net conn = P2p_conn.local_metadata conn
let connection_remote_metadata _net conn = P2p_conn.remote_metadata conn
let connection_stat _net conn = P2p_conn.stat conn
let global_stat {connect_handler; _} () =
P2p_connect_handler.stat connect_handler
let set_peer_metadata {pool; _} conn meta =
P2p_pool.Peers.set_peer_metadata pool conn meta
let get_peer_metadata {pool; _} conn =
P2p_pool.Peers.get_peer_metadata pool conn
let connect ?trusted ?expected_peer_id ?timeout net point =
P2p_connect_handler.connect
?trusted
?expected_peer_id
?timeout
net.connect_handler
point
let recv net conn =
let open Lwt_syntax in
let* msg = P2p_conn.read conn in
let peer_id = (P2p_conn.info conn).peer_id in
let* () =
match msg with
| Ok msg ->
let* () = Events.(emit message_read) peer_id in
net.received_msg_hook conn msg ;
return_unit
| Error _ -> Events.(emit message_read_error) peer_id
in
return msg
let rec recv_any net () =
let open Lwt_syntax in
let pipes =
P2p_pool.Connection.fold net.pool ~init:[] ~f:(fun _peer_id conn acc ->
(let* r = P2p_conn.is_readable conn in
match r with
| Ok () -> Lwt.return_some conn
| Error _ -> Lwt_utils.never_ending ())
:: acc)
in
let new_connection =
let* () = P2p_trigger.wait_new_connection net.triggers in
Lwt.return_none
in
let* o = Lwt.pick (new_connection :: pipes) in
match o with
| None -> recv_any net ()
| Some conn -> (
let* r = recv net conn in
match r with
| Ok msg ->
net.received_msg_hook conn msg ;
Lwt.return (conn, msg)
| Error _ ->
let* () = Lwt.pause () in
recv_any net ())
let send net conn m =
let open Lwt_result_syntax in
let*! r = P2p_conn.write conn m in
let*! () =
match r with
| Ok () ->
let peer_id = (P2p_conn.info conn).peer_id in
let*! () = Events.(emit message_sent) peer_id in
net.sent_msg_hook conn m ;
Lwt.return_unit
| Error trace ->
Events.(emit sending_message_error)
((P2p_conn.info conn).peer_id, trace)
in
Lwt.return r
let try_send net conn m =
match P2p_conn.write_now conn m with
| Ok v ->
Events.(emit__dont_wait__use_with_care message_trysent)
((P2p_conn.info conn).peer_id, v) ;
if v then net.sent_msg_hook conn m ;
v
| Error err ->
Events.(emit__dont_wait__use_with_care trysending_message_error)
((P2p_conn.info conn).peer_id, err) ;
false
(* Memoization of broadcast encoded [msg] inside a buffer [buf]. *)
(* For generalisation purposes, each connection has a `writer` that
defines a specific encoding for messages. Currently we use the
same encoding for every connection. It makes this simple
memoization possible but will need modifications if connections
have specialised encodings. *)
let broadcast_encode conn buff msg =
let open Result_syntax in
match !buff with
| None ->
let* encoded_msg = P2p_conn.encode conn msg in
buff := Some encoded_msg ;
return encoded_msg
| Some em -> return em
let send_conn ?alt conn buf alt_buf msg =
let open Result_syntax in
(* Silently discards Error P2p_errors.Connection_closed in case
the pipe is closed. Shouldn't happen because
- no race conditions (no Lwt)
- the peer state is Running.
Also ignore if the message is dropped instead of being added
to the write queue. *)
(* TODO: https://gitlab.com/tezos/tezos/-/issues/4205
Ensure sent messages are actually sent.
*)
ignore
@@ let* encoded_msg =
match alt with
| None -> broadcast_encode conn buf msg
| Some (if_conn, then_msg) ->
if if_conn conn then broadcast_encode conn alt_buf then_msg
else broadcast_encode conn buf msg
in
P2p_conn.write_encoded_now
conn
(P2p_socket.copy_encoded_message encoded_msg)
let raw_broadcast connections ?except ?alt msg =
let buf = ref None in
let alt_buf = ref None in
let send conn = send_conn ?alt conn buf alt_buf msg in
P2p_peer.Table.iter
(fun _peer_id conn ->
match except with
| None -> send conn
| Some f when not (f conn) -> send conn
| _ -> ())
connections ;
Events.(emit__dont_wait__use_with_care broadcast) ()
let broadcast net connections ?except ?alt msg =
raw_broadcast connections ?except ?alt msg ;
net.broadcasted_msg_hook connections ?except ?alt msg
let fold_connections {pool; _} ~init ~f =
P2p_pool.Connection.fold pool ~init ~f
let iter_connections {pool; _} f =
P2p_pool.Connection.fold pool ~init:() ~f:(fun gid conn () -> f gid conn)
let on_new_connection {connect_handler; _} f =
P2p_connect_handler.on_new_connection connect_handler f
let on_disconnection {connect_handler; _} f =
P2p_connect_handler.on_disconnection connect_handler f
let negotiated_version _ conn = P2p_conn.negotiated_version conn
end
module Fake = struct
let id = P2p_identity.generate_with_pow_target_0 ()
let empty_stat =
{
P2p_stat.total_sent = 0L;
total_recv = 0L;
current_inflow = 0;
current_outflow = 0;
}
let connection_info announced_version faked_metadata =
{
P2p_connection.Info.incoming = false;
peer_id = id.peer_id;
id_point = (Ipaddr.V6.unspecified, None);
remote_socket_port = 0;
announced_version;
local_metadata = faked_metadata;
remote_metadata = faked_metadata;
private_node = false;
}
end
type ('msg, 'peer_meta, 'conn_meta) t = {
announced_version : Network_version.t;
peer_id : P2p_peer.Id.t;
maintain : unit -> unit tzresult Lwt.t;
roll : unit -> unit Lwt.t;
shutdown : unit -> unit Lwt.t;
connections : unit -> ('msg, 'peer_meta, 'conn_meta) connection list;
find_connection_by_peer_id :
P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
find_connection_by_point :
P2p_point.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
disconnect :
?wait:bool ->
reason:string ->
('msg, 'peer_meta, 'conn_meta) connection ->
unit Lwt.t;
connection_info :
('msg, 'peer_meta, 'conn_meta) connection ->
'conn_meta P2p_connection.Info.t;
connection_local_metadata :
('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
connection_remote_metadata :
('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t;
global_stat : unit -> P2p_stat.t;
get_peer_metadata : P2p_peer.Id.t -> 'peer_meta;
set_peer_metadata : P2p_peer.Id.t -> 'peer_meta -> unit;
connect :
?trusted:bool ->
?expected_peer_id:P2p_peer.Id.t ->
?timeout:Ptime.span ->
P2p_point.Id.t ->
('msg, 'peer_meta, 'conn_meta) connection tzresult Lwt.t;
recv : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg tzresult Lwt.t;
recv_any : unit -> (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t;
send :
('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit tzresult Lwt.t;
try_send : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> bool;
broadcast :
('msg, 'peer_meta, 'conn_meta) connection P2p_peer.Table.t ->
?except:(('msg, 'peer_meta, 'conn_meta) connection -> bool) ->
?alt:(('msg, 'peer_meta, 'conn_meta) connection -> bool) * 'msg ->
'msg ->
unit;
pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t option;
connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t option;
fold_connections :
'a.
init:'a ->
f:(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) ->
'a;
iter_connections :
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
on_new_connection :
(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
on_disconnection : (P2p_peer.Id.t -> unit) -> unit;
negotiated_version :
('msg, 'peer_meta, 'conn_meta) connection -> Network_version.t;
activate : unit -> unit;
watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
}
type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t
let announced_version net = net.announced_version
let pool net = net.pool
let connect_handler net = net.connect_handler
let check_limits =
let open Result_syntax in
let open P2p_limits in
let fail_1 v orig =
if not (Ptime.Span.compare v Ptime.Span.zero <= 0) then return_unit
else
Error_monad.error_with
"value of option %S cannot be negative or null@."
orig
in
let fail_2 v orig =
if not (v < 0) then return_unit
else Error_monad.error_with "value of option %S cannot be negative@." orig
in
fun c ->
let* () = fail_1 c.authentication_timeout "authentication-timeout" in
let* () = fail_2 c.min_connections "min-connections" in
let* () = fail_2 c.expected_connections "expected-connections" in
let* () = fail_2 c.max_connections "max-connections" in
let* () = fail_2 c.max_incoming_connections "max-incoming-connections" in
let* () = fail_2 c.read_buffer_size "read-buffer-size" in
let* () =
match c.swap_linger with
| Some swap_linger -> fail_1 swap_linger "swap-linger"
| None -> return_unit
in
let* () =
match c.binary_chunks_size with
| None -> return_unit
| Some size -> P2p_socket.check_binary_chunks_size size
in
return_unit
let create ~config ~limits ?received_msg_hook ?sent_msg_hook
?broadcasted_msg_hook peer_cfg conn_cfg msg_cfg =
let open Lwt_result_syntax in
let*? () = check_limits limits in
let* net =
Real.create
~config
~limits
?received_msg_hook
?sent_msg_hook
?broadcasted_msg_hook
peer_cfg
msg_cfg
conn_cfg
in
return
{
announced_version =
Network_version.announced
~chain_name:msg_cfg.chain_name
~distributed_db_versions:msg_cfg.distributed_db_versions
~p2p_versions:P2p_version.supported;
peer_id = Real.peer_id net;
maintain = Real.maintain net;
roll = Real.roll net;
shutdown = Real.shutdown net;
connections = Real.connections net;
find_connection_by_peer_id = Real.find_connection_by_peer_id net;
find_connection_by_point = Real.find_connection_by_point net;
disconnect = Real.disconnect;
connection_info = Real.connection_info net;
connection_local_metadata = Real.connection_local_metadata net;
connection_remote_metadata = Real.connection_remote_metadata net;
connection_stat = Real.connection_stat net;
global_stat = Real.global_stat net;
get_peer_metadata = Real.get_peer_metadata net;
set_peer_metadata = Real.set_peer_metadata net;
connect =
(fun ?trusted ?expected_peer_id ?timeout ->
Real.connect ?trusted ?expected_peer_id ?timeout net);
recv = Real.recv net;
recv_any = Real.recv_any net;
send = Real.send net;
try_send = Real.try_send net;
broadcast = Real.broadcast net;
pool = Some net.pool;
connect_handler = Some net.connect_handler;
fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f);
iter_connections = Real.iter_connections net;
on_new_connection = Real.on_new_connection net;
on_disconnection = Real.on_disconnection net;
negotiated_version = Real.negotiated_version net;
activate = Real.activate net;
watcher = net.Real.watcher;
}
let activate t =
Events.(emit__dont_wait__use_with_care activate_layer) () ;
t.activate ()
let faked_network (msg_cfg : 'msg P2p_params.message_config) peer_cfg
faked_metadata =
let announced_version =
Network_version.announced
~chain_name:msg_cfg.chain_name
~distributed_db_versions:msg_cfg.distributed_db_versions
~p2p_versions:P2p_version.supported
in
{
announced_version;
peer_id = Fake.id.peer_id;
maintain = Lwt_result_syntax.return;
roll = Lwt.return;
shutdown = Lwt.return;
connections = (fun () -> []);
find_connection_by_peer_id = (fun _ -> None);
find_connection_by_point = (fun _ -> None);
disconnect = (fun ?wait:_ ~reason:_ _ -> Lwt.return_unit);
connection_info =
(fun _ -> Fake.connection_info announced_version faked_metadata);
connection_local_metadata = (fun _ -> faked_metadata);
connection_remote_metadata = (fun _ -> faked_metadata);
connection_stat = (fun _ -> Fake.empty_stat);
global_stat = (fun () -> Fake.empty_stat);
get_peer_metadata = (fun _ -> peer_cfg.P2p_params.peer_meta_initial ());
set_peer_metadata = (fun _ _ -> ());
connect =
(fun ?trusted:_ ?expected_peer_id:_ ?timeout:_ _ ->
Lwt_result_syntax.tzfail P2p_errors.Connection_failed);
recv = (fun _ -> Lwt_utils.never_ending ());
recv_any = (fun () -> Lwt_utils.never_ending ());
send = (fun _ _ -> Lwt_result_syntax.tzfail P2p_errors.Connection_closed);
try_send = (fun _ _ -> false);
broadcast = (fun _ ?except:_ ?alt:_ _ -> ());
fold_connections = (fun ~init ~f:_ -> init);
iter_connections = (fun _f -> ());
on_new_connection = (fun _f -> ());
on_disconnection = (fun _f -> ());
negotiated_version = (fun _ -> announced_version);
pool = None;
connect_handler = None;
activate = (fun _ -> ());
watcher = Lwt_watcher.create_input ();
}
let peer_id net = net.peer_id
let maintain net = net.maintain ()
let roll net = net.roll ()
let shutdown net = net.shutdown ()
let connections net = net.connections ()
let disconnect net = net.disconnect
let find_connection_by_peer_id net = net.find_connection_by_peer_id
let find_connection_by_point net = net.find_connection_by_point
let connection_info net = net.connection_info
let connection_local_metadata net = net.connection_local_metadata
let connection_remote_metadata net = net.connection_remote_metadata
let connection_stat net = net.connection_stat
let global_stat net = net.global_stat ()
let get_peer_metadata net = net.get_peer_metadata
let set_peer_metadata net = net.set_peer_metadata
let connect net = net.connect
let recv net = net.recv
let recv_any net = net.recv_any ()
let send net = net.send
let try_send net = net.try_send
let broadcast net connections ?except ?alt =
net.broadcast connections ?except ?alt
let fold_connections net = net.fold_connections
let iter_connections net = net.iter_connections
let on_new_connection net = net.on_new_connection
let on_disconnection net = net.on_disconnection
let greylist_addr net addr =
Option.iter (fun pool -> P2p_pool.greylist_addr pool addr) net.pool
let greylist_peer net peer_id =
Option.iter (fun pool -> P2p_pool.greylist_peer pool peer_id) net.pool
let watcher net = Lwt_watcher.create_stream net.watcher
let negotiated_version net = net.negotiated_version
module Internal_for_tests = struct
let raw_broadcast (connections : ('a, 'b, 'c) P2p_conn.t P2p_peer.Table.t)
?except ?alt =
Real.raw_broadcast connections ?except ?alt
end