https://gitlab.com/tezos/tezos
Raw File
Tip revision: e5df8d3840ea4022271fa14f27645aa6c7bc9dd3 authored by Alain Mebsout on 15 March 2024, 16:39:29 UTC
Doc: changelog
Tip revision: e5df8d3
p2p.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2019-2022 Nomadic Labs, <contact@nomadic-labs.com>          *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

module Events = P2p_events.P2p

type config = {
  listening_port : P2p_addr.port option;
  listening_addr : P2p_addr.t option;
  advertised_port : P2p_addr.port option;
  discovery_port : P2p_addr.port option;
  discovery_addr : Ipaddr.V4.t option;
  trusted_points : (P2p_point.Id.t * P2p_peer.Id.t option) list;
  peers_file : string;
  private_mode : bool;
  identity : P2p_identity.t;
  proof_of_work_target : Tezos_crypto.Crypto_box.pow_target;
  trust_discovered_peers : bool;
  reconnection_config : Point_reconnection_config.t;
  disable_peer_discovery : bool;
}

let create_scheduler limits =
  let open P2p_limits in
  let max_upload_speed = Option.map (( * ) 1024) limits.max_upload_speed in
  let max_download_speed = Option.map (( * ) 1024) limits.max_download_speed in
  P2p_io_scheduler.create
    ~read_buffer_size:limits.read_buffer_size
    ?max_upload_speed
    ?max_download_speed
    ?read_queue_size:limits.read_queue_size
    ?write_queue_size:limits.write_queue_size
    ()

let create_connection_pool config limits meta_cfg log triggers =
  let open P2p_limits in
  let pool_cfg =
    {
      P2p_pool.identity = config.identity;
      trusted_points = config.trusted_points;
      peers_file = config.peers_file;
      private_mode = config.private_mode;
      max_known_points = limits.max_known_points;
      max_known_peer_ids = limits.max_known_peer_ids;
      peer_greylist_size = limits.peer_greylist_size;
      ip_greylist_size_in_kilobytes = limits.ip_greylist_size_in_kilobytes;
      ip_greylist_cleanup_delay = limits.ip_greylist_cleanup_delay;
    }
  in
  P2p_pool.create pool_cfg meta_cfg ~log triggers

let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched
    triggers log answerer =
  let open P2p_limits in
  let connect_handler_cfg =
    {
      P2p_connect_handler.identity = config.identity;
      proof_of_work_target = config.proof_of_work_target;
      listening_port = config.listening_port;
      advertised_port = config.advertised_port;
      private_mode = config.private_mode;
      reconnection_config = config.reconnection_config;
      min_connections = limits.min_connections;
      max_connections = limits.max_connections;
      max_incoming_connections = limits.max_incoming_connections;
      connection_timeout = limits.connection_timeout;
      authentication_timeout = limits.authentication_timeout;
      incoming_app_message_queue_size = limits.incoming_app_message_queue_size;
      incoming_message_queue_size = limits.incoming_message_queue_size;
      outgoing_message_queue_size = limits.outgoing_message_queue_size;
      binary_chunks_size = limits.binary_chunks_size;
      disable_peer_discovery = config.disable_peer_discovery;
    }
  in
  P2p_connect_handler.create
    connect_handler_cfg
    pool
    msg_cfg
    conn_meta_cfg
    io_sched
    triggers
    ~log
    ~answerer

let may_create_discovery_worker _limits config pool =
  match
    (config.listening_port, config.discovery_port, config.discovery_addr)
  with
  | Some listening_port, Some discovery_port, Some discovery_addr ->
      Some
        (P2p_discovery.create
           pool
           config.identity.peer_id
           ~listening_port
           ~discovery_port
           ~discovery_addr
           ~trust_discovered_peers:config.trust_discovered_peers)
  | _, _, _ -> None

let create_maintenance_worker limits pool connect_handler config triggers log =
  let open P2p_limits in
  let open Lwt_syntax in
  match limits.maintenance_idle_time with
  | None -> return_none
  | Some maintenance_idle_time ->
      let maintenance_config =
        {
          P2p_maintenance.maintenance_idle_time;
          private_mode = config.private_mode;
          min_connections = limits.min_connections;
          max_connections = limits.max_connections;
          expected_connections = limits.expected_connections;
          time_between_looking_for_peers =
            Ptime.Span.of_int_s 5
            (* Empirical value. Enough to observe changes in the network,
               and not too long to discover new peers quickly. *)
            (* TODO: https://gitlab.com/tezos/tezos/-/issues/1655
               Check whether the value is optimal or not through integration tests
            *);
        }
      in
      let discovery = may_create_discovery_worker limits config pool in
      return_some
        (P2p_maintenance.create
           ?discovery
           maintenance_config
           pool
           connect_handler
           triggers
           ~log)

let may_create_welcome_worker config limits connect_handler =
  config.listening_port
  |> Option.map_es (fun port ->
         P2p_welcome.create
           ~backlog:limits.P2p_limits.backlog
           connect_handler
           ?addr:config.listening_addr
           port)

type ('msg, 'peer_meta, 'conn_meta) connection =
  ('msg, 'peer_meta, 'conn_meta) P2p_conn.t

module Real = struct
  type ('msg, 'peer_meta, 'conn_meta) net = {
    config : config;
    limits : P2p_limits.t;
    io_sched : P2p_io_scheduler.t;
    pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t;
    connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t;
    maintenance : ('msg, 'peer_meta, 'conn_meta) P2p_maintenance.t option;
    welcome : P2p_welcome.t option;
    watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
    triggers : P2p_trigger.t;
    received_msg_hook :
      ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit;
    sent_msg_hook : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit;
    broadcasted_msg_hook :
      ('msg, 'peer_meta, 'conn_meta) connection P2p_peer.Table.t ->
      ?except:(('msg, 'peer_meta, 'conn_meta) connection -> bool) ->
      ?alt:(('msg, 'peer_meta, 'conn_meta) connection -> bool) * 'msg ->
      'msg ->
      unit;
  }

  let create ~config ~limits ?received_msg_hook ?sent_msg_hook
      ?broadcasted_msg_hook meta_cfg msg_cfg conn_meta_cfg =
    let open Lwt_result_syntax in
    let io_sched = create_scheduler limits in
    let watcher = Lwt_watcher.create_input () in
    let log event = Lwt_watcher.notify watcher event in
    let triggers = P2p_trigger.create () in
    let*! pool = create_connection_pool config limits meta_cfg log triggers in
    (* There is a mutual recursion between an answerer and connect_handler,
       for the default answerer. Because of the swap request mechanism, the
       default answerer needs to initiate new connections using the
       [P2p_connect_handler.connect] callback. *)
    let rec answerer =
      lazy
        (if config.private_mode then P2p_protocol.create_private ()
        else
          let connect =
            P2p_connect_handler.connect (Lazy.force connect_handler)
          in
          let proto_conf =
            {
              P2p_protocol.swap_linger = limits.P2p_limits.swap_linger;
              pool;
              log;
              connect;
              latest_accepted_swap = Ptime.epoch;
              latest_successful_swap = Ptime.epoch;
            }
          in
          P2p_protocol.create_default proto_conf)
    and connect_handler =
      lazy
        (create_connect_handler
           config
           limits
           pool
           msg_cfg
           conn_meta_cfg
           io_sched
           triggers
           log
           answerer)
    in
    let connect_handler = Lazy.force connect_handler in
    let*! maintenance =
      create_maintenance_worker limits pool connect_handler config triggers log
    in
    let* welcome = may_create_welcome_worker config limits connect_handler in
    P2p_metrics_collectors.collect pool io_sched ;
    return
      {
        config;
        limits;
        io_sched;
        pool;
        connect_handler;
        maintenance;
        welcome;
        watcher;
        triggers;
        received_msg_hook =
          Option.value ~default:(fun _ _ -> ()) received_msg_hook;
        sent_msg_hook = Option.value ~default:(fun _ _ -> ()) sent_msg_hook;
        broadcasted_msg_hook =
          Option.value
            ~default:(fun _ ?except:_ ?alt:_ _ -> ())
            broadcasted_msg_hook;
      }

  let peer_id {config; _} = config.identity.peer_id

  let maintain {maintenance; _} () =
    let open Lwt_result_syntax in
    match maintenance with
    | Some maintenance ->
        let*! () = P2p_maintenance.maintain maintenance in
        return_unit
    | None -> tzfail P2p_errors.Maintenance_disabled

  let activate t () =
    Events.(emit__dont_wait__use_with_care activate_network)
      t.config.identity.peer_id ;
    (match t.welcome with None -> () | Some w -> P2p_welcome.activate w) ;
    match t.maintenance with
    | Some maintenance -> P2p_maintenance.activate maintenance
    | None -> ()

  (* TODO: https://gitlab.com/tezos/tezos/-/issues/4597
     Implement [roll] function. *)
  let roll _net () = Lwt.return_unit

  (* returns when all workers have shut down in the opposite
     creation order. *)
  let shutdown net () =
    let open Lwt_syntax in
    let* () = Events.(emit shutdown_welcome_worker) () in
    let* () = Option.iter_s P2p_welcome.shutdown net.welcome in
    let* () = Events.(emit shutdown_maintenance_worker) () in
    let* () =
      Option.iter_s
        (fun maintenance -> P2p_maintenance.shutdown maintenance)
        net.maintenance
    in
    let* () = Events.(emit shutdown_connection_pool) () in
    let* () = P2p_pool.destroy net.pool in
    let* () = Events.(emit shutdown_connection_handler) () in
    let* () = P2p_connect_handler.destroy net.connect_handler in
    let* () = Events.(emit shutdown_scheduler) () in
    P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched

  let connections {pool; _} () =
    P2p_pool.Connection.fold pool ~init:[] ~f:(fun _peer_id c acc -> c :: acc)

  let find_connection_by_peer_id {pool; _} peer_id =
    P2p_pool.Connection.find_by_peer_id pool peer_id

  let find_connection_by_point {pool; _} point =
    P2p_pool.Connection.find_by_point pool point

  let disconnect ?wait ~reason = P2p_conn.disconnect ?wait ~reason:(User reason)

  let connection_info _net conn = P2p_conn.info conn

  let connection_local_metadata _net conn = P2p_conn.local_metadata conn

  let connection_remote_metadata _net conn = P2p_conn.remote_metadata conn

  let connection_stat _net conn = P2p_conn.stat conn

  let global_stat {connect_handler; _} () =
    P2p_connect_handler.stat connect_handler

  let set_peer_metadata {pool; _} conn meta =
    P2p_pool.Peers.set_peer_metadata pool conn meta

  let get_peer_metadata {pool; _} conn =
    P2p_pool.Peers.get_peer_metadata pool conn

  let connect ?trusted ?expected_peer_id ?timeout net point =
    P2p_connect_handler.connect
      ?trusted
      ?expected_peer_id
      ?timeout
      net.connect_handler
      point

  let recv net conn =
    let open Lwt_syntax in
    let* msg = P2p_conn.read conn in
    let peer_id = (P2p_conn.info conn).peer_id in
    let* () =
      match msg with
      | Ok msg ->
          let* () = Events.(emit message_read) peer_id in
          net.received_msg_hook conn msg ;
          return_unit
      | Error _ -> Events.(emit message_read_error) peer_id
    in
    return msg

  let rec recv_any net () =
    let open Lwt_syntax in
    let pipes =
      P2p_pool.Connection.fold net.pool ~init:[] ~f:(fun _peer_id conn acc ->
          (let* r = P2p_conn.is_readable conn in
           match r with
           | Ok () -> Lwt.return_some conn
           | Error _ -> Lwt_utils.never_ending ())
          :: acc)
    in
    let new_connection =
      let* () = P2p_trigger.wait_new_connection net.triggers in
      Lwt.return_none
    in
    let* o = Lwt.pick (new_connection :: pipes) in
    match o with
    | None -> recv_any net ()
    | Some conn -> (
        let* r = recv net conn in
        match r with
        | Ok msg ->
            net.received_msg_hook conn msg ;
            Lwt.return (conn, msg)
        | Error _ ->
            let* () = Lwt.pause () in
            recv_any net ())

  let send net conn m =
    let open Lwt_result_syntax in
    let*! r = P2p_conn.write conn m in
    let*! () =
      match r with
      | Ok () ->
          let peer_id = (P2p_conn.info conn).peer_id in
          let*! () = Events.(emit message_sent) peer_id in
          net.sent_msg_hook conn m ;
          Lwt.return_unit
      | Error trace ->
          Events.(emit sending_message_error)
            ((P2p_conn.info conn).peer_id, trace)
    in
    Lwt.return r

  let try_send net conn m =
    match P2p_conn.write_now conn m with
    | Ok v ->
        Events.(emit__dont_wait__use_with_care message_trysent)
          ((P2p_conn.info conn).peer_id, v) ;
        if v then net.sent_msg_hook conn m ;
        v
    | Error err ->
        Events.(emit__dont_wait__use_with_care trysending_message_error)
          ((P2p_conn.info conn).peer_id, err) ;
        false

  (* Memoization of broadcast encoded [msg] inside a buffer [buf]. *)
  (* For generalisation purposes, each connection has a `writer` that
     defines a specific encoding for messages. Currently we use the
     same encoding for every connection. It makes this simple
     memoization possible but will need modifications if connections
     have specialised encodings. *)
  let broadcast_encode conn buff msg =
    let open Result_syntax in
    match !buff with
    | None ->
        let* encoded_msg = P2p_conn.encode conn msg in
        buff := Some encoded_msg ;
        return encoded_msg
    | Some em -> return em

  let send_conn ?alt conn buf alt_buf msg =
    let open Result_syntax in
    (* Silently discards Error P2p_errors.Connection_closed in case
                  the pipe is closed. Shouldn't happen because
       - no race conditions (no Lwt)
       - the peer state is Running.

       Also ignore if the message is dropped instead of being added
       to the write queue. *)
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4205
       Ensure sent messages are actually sent.
    *)
    ignore
    @@ let* encoded_msg =
         match alt with
         | None -> broadcast_encode conn buf msg
         | Some (if_conn, then_msg) ->
             if if_conn conn then broadcast_encode conn alt_buf then_msg
             else broadcast_encode conn buf msg
       in
       P2p_conn.write_encoded_now
         conn
         (P2p_socket.copy_encoded_message encoded_msg)

  let raw_broadcast connections ?except ?alt msg =
    let buf = ref None in
    let alt_buf = ref None in
    let send conn = send_conn ?alt conn buf alt_buf msg in
    P2p_peer.Table.iter
      (fun _peer_id conn ->
        match except with
        | None -> send conn
        | Some f when not (f conn) -> send conn
        | _ -> ())
      connections ;
    Events.(emit__dont_wait__use_with_care broadcast) ()

  let broadcast net connections ?except ?alt msg =
    raw_broadcast connections ?except ?alt msg ;
    net.broadcasted_msg_hook connections ?except ?alt msg

  let fold_connections {pool; _} ~init ~f =
    P2p_pool.Connection.fold pool ~init ~f

  let iter_connections {pool; _} f =
    P2p_pool.Connection.fold pool ~init:() ~f:(fun gid conn () -> f gid conn)

  let on_new_connection {connect_handler; _} f =
    P2p_connect_handler.on_new_connection connect_handler f

  let on_disconnection {connect_handler; _} f =
    P2p_connect_handler.on_disconnection connect_handler f

  let negotiated_version _ conn = P2p_conn.negotiated_version conn
end

module Fake = struct
  let id = P2p_identity.generate_with_pow_target_0 ()

  let empty_stat =
    {
      P2p_stat.total_sent = 0L;
      total_recv = 0L;
      current_inflow = 0;
      current_outflow = 0;
    }

  let connection_info announced_version faked_metadata =
    {
      P2p_connection.Info.incoming = false;
      peer_id = id.peer_id;
      id_point = (Ipaddr.V6.unspecified, None);
      remote_socket_port = 0;
      announced_version;
      local_metadata = faked_metadata;
      remote_metadata = faked_metadata;
      private_node = false;
    }
end

type ('msg, 'peer_meta, 'conn_meta) t = {
  announced_version : Network_version.t;
  peer_id : P2p_peer.Id.t;
  maintain : unit -> unit tzresult Lwt.t;
  roll : unit -> unit Lwt.t;
  shutdown : unit -> unit Lwt.t;
  connections : unit -> ('msg, 'peer_meta, 'conn_meta) connection list;
  find_connection_by_peer_id :
    P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
  find_connection_by_point :
    P2p_point.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
  disconnect :
    ?wait:bool ->
    reason:string ->
    ('msg, 'peer_meta, 'conn_meta) connection ->
    unit Lwt.t;
  connection_info :
    ('msg, 'peer_meta, 'conn_meta) connection ->
    'conn_meta P2p_connection.Info.t;
  connection_local_metadata :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
  connection_remote_metadata :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
  connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t;
  global_stat : unit -> P2p_stat.t;
  get_peer_metadata : P2p_peer.Id.t -> 'peer_meta;
  set_peer_metadata : P2p_peer.Id.t -> 'peer_meta -> unit;
  connect :
    ?trusted:bool ->
    ?expected_peer_id:P2p_peer.Id.t ->
    ?timeout:Ptime.span ->
    P2p_point.Id.t ->
    ('msg, 'peer_meta, 'conn_meta) connection tzresult Lwt.t;
  recv : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg tzresult Lwt.t;
  recv_any : unit -> (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t;
  send :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit tzresult Lwt.t;
  try_send : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> bool;
  broadcast :
    ('msg, 'peer_meta, 'conn_meta) connection P2p_peer.Table.t ->
    ?except:(('msg, 'peer_meta, 'conn_meta) connection -> bool) ->
    ?alt:(('msg, 'peer_meta, 'conn_meta) connection -> bool) * 'msg ->
    'msg ->
    unit;
  pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t option;
  connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t option;
  fold_connections :
    'a.
    init:'a ->
    f:(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) ->
    'a;
  iter_connections :
    (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
  on_new_connection :
    (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
  on_disconnection : (P2p_peer.Id.t -> unit) -> unit;
  negotiated_version :
    ('msg, 'peer_meta, 'conn_meta) connection -> Network_version.t;
  activate : unit -> unit;
  watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
}

type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t

let announced_version net = net.announced_version

let pool net = net.pool

let connect_handler net = net.connect_handler

let check_limits =
  let open Result_syntax in
  let open P2p_limits in
  let fail_1 v orig =
    if not (Ptime.Span.compare v Ptime.Span.zero <= 0) then return_unit
    else
      Error_monad.error_with
        "value of option %S cannot be negative or null@."
        orig
  in
  let fail_2 v orig =
    if not (v < 0) then return_unit
    else Error_monad.error_with "value of option %S cannot be negative@." orig
  in
  fun c ->
    let* () = fail_1 c.authentication_timeout "authentication-timeout" in
    let* () = fail_2 c.min_connections "min-connections" in
    let* () = fail_2 c.expected_connections "expected-connections" in
    let* () = fail_2 c.max_connections "max-connections" in
    let* () = fail_2 c.max_incoming_connections "max-incoming-connections" in
    let* () = fail_2 c.read_buffer_size "read-buffer-size" in
    let* () =
      match c.swap_linger with
      | Some swap_linger -> fail_1 swap_linger "swap-linger"
      | None -> return_unit
    in
    let* () =
      match c.binary_chunks_size with
      | None -> return_unit
      | Some size -> P2p_socket.check_binary_chunks_size size
    in
    return_unit

let create ~config ~limits ?received_msg_hook ?sent_msg_hook
    ?broadcasted_msg_hook peer_cfg conn_cfg msg_cfg =
  let open Lwt_result_syntax in
  let*? () = check_limits limits in
  let* net =
    Real.create
      ~config
      ~limits
      ?received_msg_hook
      ?sent_msg_hook
      ?broadcasted_msg_hook
      peer_cfg
      msg_cfg
      conn_cfg
  in
  return
    {
      announced_version =
        Network_version.announced
          ~chain_name:msg_cfg.chain_name
          ~distributed_db_versions:msg_cfg.distributed_db_versions
          ~p2p_versions:P2p_version.supported;
      peer_id = Real.peer_id net;
      maintain = Real.maintain net;
      roll = Real.roll net;
      shutdown = Real.shutdown net;
      connections = Real.connections net;
      find_connection_by_peer_id = Real.find_connection_by_peer_id net;
      find_connection_by_point = Real.find_connection_by_point net;
      disconnect = Real.disconnect;
      connection_info = Real.connection_info net;
      connection_local_metadata = Real.connection_local_metadata net;
      connection_remote_metadata = Real.connection_remote_metadata net;
      connection_stat = Real.connection_stat net;
      global_stat = Real.global_stat net;
      get_peer_metadata = Real.get_peer_metadata net;
      set_peer_metadata = Real.set_peer_metadata net;
      connect =
        (fun ?trusted ?expected_peer_id ?timeout ->
          Real.connect ?trusted ?expected_peer_id ?timeout net);
      recv = Real.recv net;
      recv_any = Real.recv_any net;
      send = Real.send net;
      try_send = Real.try_send net;
      broadcast = Real.broadcast net;
      pool = Some net.pool;
      connect_handler = Some net.connect_handler;
      fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f);
      iter_connections = Real.iter_connections net;
      on_new_connection = Real.on_new_connection net;
      on_disconnection = Real.on_disconnection net;
      negotiated_version = Real.negotiated_version net;
      activate = Real.activate net;
      watcher = net.Real.watcher;
    }

let activate t =
  Events.(emit__dont_wait__use_with_care activate_layer) () ;
  t.activate ()

let faked_network (msg_cfg : 'msg P2p_params.message_config) peer_cfg
    faked_metadata =
  let announced_version =
    Network_version.announced
      ~chain_name:msg_cfg.chain_name
      ~distributed_db_versions:msg_cfg.distributed_db_versions
      ~p2p_versions:P2p_version.supported
  in
  {
    announced_version;
    peer_id = Fake.id.peer_id;
    maintain = Lwt_result_syntax.return;
    roll = Lwt.return;
    shutdown = Lwt.return;
    connections = (fun () -> []);
    find_connection_by_peer_id = (fun _ -> None);
    find_connection_by_point = (fun _ -> None);
    disconnect = (fun ?wait:_ ~reason:_ _ -> Lwt.return_unit);
    connection_info =
      (fun _ -> Fake.connection_info announced_version faked_metadata);
    connection_local_metadata = (fun _ -> faked_metadata);
    connection_remote_metadata = (fun _ -> faked_metadata);
    connection_stat = (fun _ -> Fake.empty_stat);
    global_stat = (fun () -> Fake.empty_stat);
    get_peer_metadata = (fun _ -> peer_cfg.P2p_params.peer_meta_initial ());
    set_peer_metadata = (fun _ _ -> ());
    connect =
      (fun ?trusted:_ ?expected_peer_id:_ ?timeout:_ _ ->
        Lwt_result_syntax.tzfail P2p_errors.Connection_failed);
    recv = (fun _ -> Lwt_utils.never_ending ());
    recv_any = (fun () -> Lwt_utils.never_ending ());
    send = (fun _ _ -> Lwt_result_syntax.tzfail P2p_errors.Connection_closed);
    try_send = (fun _ _ -> false);
    broadcast = (fun _ ?except:_ ?alt:_ _ -> ());
    fold_connections = (fun ~init ~f:_ -> init);
    iter_connections = (fun _f -> ());
    on_new_connection = (fun _f -> ());
    on_disconnection = (fun _f -> ());
    negotiated_version = (fun _ -> announced_version);
    pool = None;
    connect_handler = None;
    activate = (fun _ -> ());
    watcher = Lwt_watcher.create_input ();
  }

let peer_id net = net.peer_id

let maintain net = net.maintain ()

let roll net = net.roll ()

let shutdown net = net.shutdown ()

let connections net = net.connections ()

let disconnect net = net.disconnect

let find_connection_by_peer_id net = net.find_connection_by_peer_id

let find_connection_by_point net = net.find_connection_by_point

let connection_info net = net.connection_info

let connection_local_metadata net = net.connection_local_metadata

let connection_remote_metadata net = net.connection_remote_metadata

let connection_stat net = net.connection_stat

let global_stat net = net.global_stat ()

let get_peer_metadata net = net.get_peer_metadata

let set_peer_metadata net = net.set_peer_metadata

let connect net = net.connect

let recv net = net.recv

let recv_any net = net.recv_any ()

let send net = net.send

let try_send net = net.try_send

let broadcast net connections ?except ?alt =
  net.broadcast connections ?except ?alt

let fold_connections net = net.fold_connections

let iter_connections net = net.iter_connections

let on_new_connection net = net.on_new_connection

let on_disconnection net = net.on_disconnection

let greylist_addr net addr =
  Option.iter (fun pool -> P2p_pool.greylist_addr pool addr) net.pool

let greylist_peer net peer_id =
  Option.iter (fun pool -> P2p_pool.greylist_peer pool peer_id) net.pool

let watcher net = Lwt_watcher.create_stream net.watcher

let negotiated_version net = net.negotiated_version

module Internal_for_tests = struct
  let raw_broadcast (connections : ('a, 'b, 'c) P2p_conn.t P2p_peer.Table.t)
      ?except ?alt =
    Real.raw_broadcast connections ?except ?alt
end
back to top