swh:1:snp:505c374fd75bb208ae4e9a54e64bb310bc49295e
Raw File
Tip revision: 85e9d7cc7d205511e47de8ba2b5a4be0eb864166 authored by mujx on 26 October 2021, 13:11:55 UTC
CI: Use a Gitlab native solution to find changed files
Tip revision: 85e9d7c
p2p.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2019-2020 Nomadic Labs, <contact@nomadic-labs.com>          *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

module Events = P2p_events.P2p

type config = {
  listening_port : P2p_addr.port option;
  listening_addr : P2p_addr.t option;
  discovery_port : P2p_addr.port option;
  discovery_addr : Ipaddr.V4.t option;
  trusted_points : (P2p_point.Id.t * P2p_peer.Id.t option) list;
  peers_file : string;
  private_mode : bool;
  identity : P2p_identity.t;
  proof_of_work_target : Crypto_box.pow_target;
  trust_discovered_peers : bool;
  reconnection_config : P2p_point_state.Info.reconnection_config;
}

type limits = {
  connection_timeout : Time.System.Span.t;
  authentication_timeout : Time.System.Span.t;
  greylist_timeout : Time.System.Span.t;
  maintenance_idle_time : Time.System.Span.t;
  min_connections : int;
  expected_connections : int;
  max_connections : int;
  backlog : int;
  max_incoming_connections : int;
  max_download_speed : int option;
  max_upload_speed : int option;
  read_buffer_size : int;
  read_queue_size : int option;
  write_queue_size : int option;
  incoming_app_message_queue_size : int option;
  incoming_message_queue_size : int option;
  outgoing_message_queue_size : int option;
  max_known_peer_ids : (int * int) option;
  max_known_points : (int * int) option;
  peer_greylist_size : int;
  ip_greylist_size_in_kilobytes : int;
  ip_greylist_cleanup_delay : Time.System.Span.t;
  swap_linger : Time.System.Span.t;
  binary_chunks_size : int option;
}

let create_scheduler limits =
  let max_upload_speed = Option.map (( * ) 1024) limits.max_upload_speed in
  let max_download_speed = Option.map (( * ) 1024) limits.max_download_speed in
  P2p_io_scheduler.create
    ~read_buffer_size:limits.read_buffer_size
    ?max_upload_speed
    ?max_download_speed
    ?read_queue_size:limits.read_queue_size
    ?write_queue_size:limits.write_queue_size
    ()

let create_connection_pool config limits meta_cfg log triggers =
  let pool_cfg =
    {
      P2p_pool.identity = config.identity;
      trusted_points = config.trusted_points;
      peers_file = config.peers_file;
      private_mode = config.private_mode;
      max_known_points = limits.max_known_points;
      max_known_peer_ids = limits.max_known_peer_ids;
      peer_greylist_size = limits.peer_greylist_size;
      ip_greylist_size_in_kilobytes = limits.ip_greylist_size_in_kilobytes;
      ip_greylist_cleanup_delay = limits.ip_greylist_cleanup_delay;
    }
  in
  P2p_pool.create pool_cfg meta_cfg ~log triggers

let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched
    triggers log answerer =
  let connect_handler_cfg =
    {
      P2p_connect_handler.identity = config.identity;
      proof_of_work_target = config.proof_of_work_target;
      listening_port = config.listening_port;
      private_mode = config.private_mode;
      reconnection_config = config.reconnection_config;
      min_connections = limits.min_connections;
      max_connections = limits.max_connections;
      max_incoming_connections = limits.max_incoming_connections;
      connection_timeout = limits.connection_timeout;
      authentication_timeout = limits.authentication_timeout;
      incoming_app_message_queue_size = limits.incoming_app_message_queue_size;
      incoming_message_queue_size = limits.incoming_message_queue_size;
      outgoing_message_queue_size = limits.outgoing_message_queue_size;
      binary_chunks_size = limits.binary_chunks_size;
    }
  in
  P2p_connect_handler.create
    connect_handler_cfg
    pool
    msg_cfg
    conn_meta_cfg
    io_sched
    triggers
    ~log
    ~answerer

let may_create_discovery_worker _limits config pool =
  match
    (config.listening_port, config.discovery_port, config.discovery_addr)
  with
  | (Some listening_port, Some discovery_port, Some discovery_addr) ->
      Some
        (P2p_discovery.create
           pool
           config.identity.peer_id
           ~listening_port
           ~discovery_port
           ~discovery_addr
           ~trust_discovered_peers:config.trust_discovered_peers)
  | (_, _, _) -> None

let create_maintenance_worker limits pool connect_handler config triggers log =
  let maintenance_config =
    {
      P2p_maintenance.maintenance_idle_time = limits.maintenance_idle_time;
      private_mode = config.private_mode;
      min_connections = limits.min_connections;
      max_connections = limits.max_connections;
      expected_connections = limits.expected_connections;
      time_between_looking_for_peers =
        Ptime.Span.of_int_s 5
        (* Empirical value. Enough to observe changes in the network,
           and not too long to discover new peers quickly. *)
        (* TODO: https://gitlab.com/tezos/tezos/-/issues/1655
           Check whether the value is optimal or not through integration tests
        *);
    }
  in
  let discovery = may_create_discovery_worker limits config pool in
  P2p_maintenance.create
    ?discovery
    maintenance_config
    pool
    connect_handler
    triggers
    ~log

let may_create_welcome_worker config limits connect_handler =
  config.listening_port
  |> Option.map_es (fun port ->
         P2p_welcome.create
           ~backlog:limits.backlog
           connect_handler
           ?addr:config.listening_addr
           port)

type ('msg, 'peer_meta, 'conn_meta) connection =
  ('msg, 'peer_meta, 'conn_meta) P2p_conn.t

module Real = struct
  type ('msg, 'peer_meta, 'conn_meta) net = {
    config : config;
    limits : limits;
    io_sched : P2p_io_scheduler.t;
    pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t;
    connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t;
    maintenance : ('msg, 'peer_meta, 'conn_meta) P2p_maintenance.t;
    welcome : P2p_welcome.t option;
    watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
    triggers : P2p_trigger.t;
  }

  let create ~config ~limits meta_cfg msg_cfg conn_meta_cfg =
    let io_sched = create_scheduler limits in
    let watcher = Lwt_watcher.create_input () in
    let log event = Lwt_watcher.notify watcher event in
    let triggers = P2p_trigger.create () in
    create_connection_pool config limits meta_cfg log triggers >>= fun pool ->
    (* There is a mutual recursion between an answerer and connect_handler,
       for the default answerer. Because of the swap request mechanism, the
       default answerer needs to initiate new connections using the
       [P2p_connect_handler.connect] callback. *)
    let rec answerer =
      lazy
        (if config.private_mode then P2p_protocol.create_private ()
        else
          let connect =
            P2p_connect_handler.connect (Lazy.force connect_handler)
          in
          let proto_conf =
            {
              P2p_protocol.swap_linger = limits.swap_linger;
              pool;
              log;
              connect;
              latest_accepted_swap = Ptime.epoch;
              latest_successful_swap = Ptime.epoch;
            }
          in
          P2p_protocol.create_default proto_conf)
    and connect_handler =
      lazy
        (create_connect_handler
           config
           limits
           pool
           msg_cfg
           conn_meta_cfg
           io_sched
           triggers
           log
           answerer)
    in
    let connect_handler = Lazy.force connect_handler in
    let maintenance =
      create_maintenance_worker limits pool connect_handler config triggers log
    in
    may_create_welcome_worker config limits connect_handler >>=? fun welcome ->
    return
      {
        config;
        limits;
        io_sched;
        pool;
        connect_handler;
        maintenance;
        welcome;
        watcher;
        triggers;
      }

  let peer_id {config; _} = config.identity.peer_id

  let maintain {maintenance; _} () = P2p_maintenance.maintain maintenance

  let activate t () =
    Events.(emit__dont_wait__use_with_care activate_network) () ;
    (match t.welcome with None -> () | Some w -> P2p_welcome.activate w) ;
    P2p_maintenance.activate t.maintenance ;
    ()

  let roll _net () = Lwt.return_unit (* TODO implement *)

  (* returns when all workers have shut down in the opposite
     creation order. *)
  let shutdown net () =
    Events.(emit shutdown_welcome_worker) () >>= fun () ->
    Option.iter_s P2p_welcome.shutdown net.welcome >>= fun () ->
    Events.(emit shutdown_maintenance_worker) () >>= fun () ->
    P2p_maintenance.shutdown net.maintenance >>= fun () ->
    Events.(emit shutdown_connection_pool) () >>= fun () ->
    P2p_pool.destroy net.pool >>= fun () ->
    Events.(emit shutdown_connection_handler) () >>= fun () ->
    P2p_connect_handler.destroy net.connect_handler >>= fun () ->
    Events.(emit shutdown_scheduler) () >>= fun () ->
    P2p_io_scheduler.shutdown ~timeout:3.0 net.io_sched

  let connections {pool; _} () =
    P2p_pool.Connection.fold pool ~init:[] ~f:(fun _peer_id c acc -> c :: acc)

  let find_connection_by_peer_id {pool; _} peer_id =
    P2p_pool.Connection.find_by_peer_id pool peer_id

  let find_connection_by_point {pool; _} point =
    P2p_pool.Connection.find_by_point pool point

  let disconnect ?wait conn = P2p_conn.disconnect ?wait conn

  let connection_info _net conn = P2p_conn.info conn

  let connection_local_metadata _net conn = P2p_conn.local_metadata conn

  let connection_remote_metadata _net conn = P2p_conn.remote_metadata conn

  let connection_stat _net conn = P2p_conn.stat conn

  let global_stat {connect_handler; _} () =
    P2p_connect_handler.stat connect_handler

  let set_peer_metadata {pool; _} conn meta =
    P2p_pool.Peers.set_peer_metadata pool conn meta

  let get_peer_metadata {pool; _} conn =
    P2p_pool.Peers.get_peer_metadata pool conn

  let connect ?timeout net point =
    P2p_connect_handler.connect ?timeout net.connect_handler point

  let recv _net conn =
    P2p_conn.read conn >>=? fun msg ->
    Events.(emit message_read) (P2p_conn.info conn).peer_id >>= fun () ->
    return msg

  let rec recv_any net () =
    let pipes =
      P2p_pool.Connection.fold net.pool ~init:[] ~f:(fun _peer_id conn acc ->
          (P2p_conn.is_readable conn >>= function
           | Ok () -> Lwt.return_some conn
           | Error _ -> Lwt_utils.never_ending ())
          :: acc)
    in
    Lwt.pick
      (( P2p_trigger.wait_new_connection net.triggers >>= fun () ->
         Lwt.return_none )
       :: pipes)
    >>= function
    | None -> recv_any net ()
    | Some conn -> (
        P2p_conn.read conn >>= function
        | Ok msg ->
            Events.(emit message_read) (P2p_conn.info conn).peer_id
            >>= fun () -> Lwt.return (conn, msg)
        | Error _ ->
            Events.(emit message_read_error) (P2p_conn.info conn).peer_id
            >>= fun () ->
            Lwt_unix.yield () >>= fun () -> recv_any net ())

  let send _net conn m =
    P2p_conn.write conn m >>= function
    | Ok () ->
        Events.(emit message_sent) (P2p_conn.info conn).peer_id >>= fun () ->
        return_unit
    | Error err ->
        Events.(emit sending_message_error) ((P2p_conn.info conn).peer_id, err)
        >>= fun () -> Lwt.return_error err

  let try_send _net conn v =
    match P2p_conn.write_now conn v with
    | Ok v ->
        Events.(emit__dont_wait__use_with_care message_trysent)
          (P2p_conn.info conn).peer_id ;
        v
    | Error err ->
        Events.(emit__dont_wait__use_with_care trysending_message_error)
          ((P2p_conn.info conn).peer_id, err) ;
        false

  let broadcast {pool; _} msg =
    P2p_peer.Table.iter
      (fun _peer_id peer_info ->
        match P2p_peer_state.get peer_info with
        | Running {data = conn; _} ->
            (* Silently discards Error P2p_errors.Connection_closed in case
                the pipe is closed. Shouldn't happen because
                - no race conditions (no Lwt)
                - the peer state is Running. *)
            ignore (P2p_conn.write_now conn msg : bool tzresult)
        | _ -> ())
      (P2p_pool.connected_peer_ids pool) ;
    Events.(emit__dont_wait__use_with_care broadcast) ()

  let fold_connections {pool; _} ~init ~f =
    P2p_pool.Connection.fold pool ~init ~f

  let iter_connections {pool; _} f =
    P2p_pool.Connection.fold pool ~init:() ~f:(fun gid conn () -> f gid conn)

  let on_new_connection {connect_handler; _} f =
    P2p_connect_handler.on_new_connection connect_handler f
end

module Fake = struct
  let id = P2p_identity.generate_with_pow_target_0 ()

  let empty_stat =
    {
      P2p_stat.total_sent = 0L;
      total_recv = 0L;
      current_inflow = 0;
      current_outflow = 0;
    }

  let connection_info announced_version faked_metadata =
    {
      P2p_connection.Info.incoming = false;
      peer_id = id.peer_id;
      id_point = (Ipaddr.V6.unspecified, None);
      remote_socket_port = 0;
      announced_version;
      local_metadata = faked_metadata;
      remote_metadata = faked_metadata;
      private_node = false;
    }
end

type ('msg, 'peer_meta, 'conn_meta) t = {
  announced_version : Network_version.t;
  peer_id : P2p_peer.Id.t;
  maintain : unit -> unit Lwt.t;
  roll : unit -> unit Lwt.t;
  shutdown : unit -> unit Lwt.t;
  connections : unit -> ('msg, 'peer_meta, 'conn_meta) connection list;
  find_connection_by_peer_id :
    P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
  find_connection_by_point :
    P2p_point.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection option;
  disconnect :
    ?wait:bool -> ('msg, 'peer_meta, 'conn_meta) connection -> unit Lwt.t;
  connection_info :
    ('msg, 'peer_meta, 'conn_meta) connection ->
    'conn_meta P2p_connection.Info.t;
  connection_local_metadata :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
  connection_remote_metadata :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'conn_meta;
  connection_stat : ('msg, 'peer_meta, 'conn_meta) connection -> P2p_stat.t;
  global_stat : unit -> P2p_stat.t;
  get_peer_metadata : P2p_peer.Id.t -> 'peer_meta;
  set_peer_metadata : P2p_peer.Id.t -> 'peer_meta -> unit;
  connect :
    ?timeout:Ptime.span ->
    P2p_point.Id.t ->
    ('msg, 'peer_meta, 'conn_meta) connection tzresult Lwt.t;
  recv : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg tzresult Lwt.t;
  recv_any : unit -> (('msg, 'peer_meta, 'conn_meta) connection * 'msg) Lwt.t;
  send :
    ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit tzresult Lwt.t;
  try_send : ('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> bool;
  broadcast : 'msg -> unit;
  pool : ('msg, 'peer_meta, 'conn_meta) P2p_pool.t option;
  connect_handler : ('msg, 'peer_meta, 'conn_meta) P2p_connect_handler.t option;
  fold_connections :
    'a.
    init:'a ->
    f:(P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> 'a -> 'a) ->
    'a;
  iter_connections :
    (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
  on_new_connection :
    (P2p_peer.Id.t -> ('msg, 'peer_meta, 'conn_meta) connection -> unit) -> unit;
  activate : unit -> unit;
  watcher : P2p_connection.P2p_event.t Lwt_watcher.input;
}

type ('msg, 'peer_meta, 'conn_meta) net = ('msg, 'peer_meta, 'conn_meta) t

let announced_version net = net.announced_version

let pool net = net.pool

let connect_handler net = net.connect_handler

let check_limits =
  let fail_1 v orig =
    if not (Ptime.Span.compare v Ptime.Span.zero <= 0) then return_unit
    else
      Error_monad.failwith
        "value of option %S cannot be negative or null@."
        orig
  in
  let fail_2 v orig =
    if not (v < 0) then return_unit
    else Error_monad.failwith "value of option %S cannot be negative@." orig
  in
  fun c ->
    fail_1 c.authentication_timeout "authentication-timeout" >>=? fun () ->
    fail_2 c.min_connections "min-connections" >>=? fun () ->
    fail_2 c.expected_connections "expected-connections" >>=? fun () ->
    fail_2 c.max_connections "max-connections" >>=? fun () ->
    fail_2 c.max_incoming_connections "max-incoming-connections" >>=? fun () ->
    fail_2 c.read_buffer_size "read-buffer-size" >>=? fun () ->
    fail_1 c.swap_linger "swap-linger" >>=? fun () ->
    (match c.binary_chunks_size with
    | None -> return_unit
    | Some size -> P2p_socket.check_binary_chunks_size size)
    >>=? fun () -> return_unit

let create ~config ~limits peer_cfg conn_cfg msg_cfg =
  check_limits limits >>=? fun () ->
  Real.create ~config ~limits peer_cfg msg_cfg conn_cfg >>=? fun net ->
  return
    {
      announced_version =
        Network_version.announced
          ~chain_name:msg_cfg.chain_name
          ~distributed_db_versions:msg_cfg.distributed_db_versions
          ~p2p_versions:P2p_version.supported;
      peer_id = Real.peer_id net;
      maintain = Real.maintain net;
      roll = Real.roll net;
      shutdown = Real.shutdown net;
      connections = Real.connections net;
      find_connection_by_peer_id = Real.find_connection_by_peer_id net;
      find_connection_by_point = Real.find_connection_by_point net;
      disconnect = Real.disconnect;
      connection_info = Real.connection_info net;
      connection_local_metadata = Real.connection_local_metadata net;
      connection_remote_metadata = Real.connection_remote_metadata net;
      connection_stat = Real.connection_stat net;
      global_stat = Real.global_stat net;
      get_peer_metadata = Real.get_peer_metadata net;
      set_peer_metadata = Real.set_peer_metadata net;
      connect = (fun ?timeout -> Real.connect ?timeout net);
      recv = Real.recv net;
      recv_any = Real.recv_any net;
      send = Real.send net;
      try_send = Real.try_send net;
      broadcast = Real.broadcast net;
      pool = Some net.pool;
      connect_handler = Some net.connect_handler;
      fold_connections = (fun ~init ~f -> Real.fold_connections net ~init ~f);
      iter_connections = Real.iter_connections net;
      on_new_connection = Real.on_new_connection net;
      activate = Real.activate net;
      watcher = net.Real.watcher;
    }

let activate t =
  Events.(emit__dont_wait__use_with_care activate_layer) () ;
  t.activate ()

let faked_network (msg_cfg : 'msg P2p_params.message_config) peer_cfg
    faked_metadata =
  let announced_version =
    Network_version.announced
      ~chain_name:msg_cfg.chain_name
      ~distributed_db_versions:msg_cfg.distributed_db_versions
      ~p2p_versions:P2p_version.supported
  in
  {
    announced_version;
    peer_id = Fake.id.peer_id;
    maintain = Lwt.return;
    roll = Lwt.return;
    shutdown = Lwt.return;
    connections = (fun () -> []);
    find_connection_by_peer_id = (fun _ -> None);
    find_connection_by_point = (fun _ -> None);
    disconnect = (fun ?wait:_ _ -> Lwt.return_unit);
    connection_info =
      (fun _ -> Fake.connection_info announced_version faked_metadata);
    connection_local_metadata = (fun _ -> faked_metadata);
    connection_remote_metadata = (fun _ -> faked_metadata);
    connection_stat = (fun _ -> Fake.empty_stat);
    global_stat = (fun () -> Fake.empty_stat);
    get_peer_metadata = (fun _ -> peer_cfg.P2p_params.peer_meta_initial ());
    set_peer_metadata = (fun _ _ -> ());
    connect = (fun ?timeout:_ _ -> fail P2p_errors.Connection_refused);
    recv = (fun _ -> Lwt_utils.never_ending ());
    recv_any = (fun () -> Lwt_utils.never_ending ());
    send = (fun _ _ -> fail P2p_errors.Connection_closed);
    try_send = (fun _ _ -> false);
    fold_connections = (fun ~init ~f:_ -> init);
    iter_connections = (fun _f -> ());
    on_new_connection = (fun _f -> ());
    broadcast = ignore;
    pool = None;
    connect_handler = None;
    activate = (fun _ -> ());
    watcher = Lwt_watcher.create_input ();
  }

let peer_id net = net.peer_id

let maintain net = net.maintain ()

let roll net = net.roll ()

let shutdown net = net.shutdown ()

let connections net = net.connections ()

let disconnect net = net.disconnect

let find_connection_by_peer_id net = net.find_connection_by_peer_id

let find_connection_by_point net = net.find_connection_by_point

let connection_info net = net.connection_info

let connection_local_metadata net = net.connection_local_metadata

let connection_remote_metadata net = net.connection_remote_metadata

let connection_stat net = net.connection_stat

let global_stat net = net.global_stat ()

let get_peer_metadata net = net.get_peer_metadata

let set_peer_metadata net = net.set_peer_metadata

let connect net = net.connect

let recv net = net.recv

let recv_any net = net.recv_any ()

let send net = net.send

let try_send net = net.try_send

let broadcast net = net.broadcast

let fold_connections net = net.fold_connections

let iter_connections net = net.iter_connections

let on_new_connection net = net.on_new_connection

let greylist_addr net addr =
  Option.iter (fun pool -> P2p_pool.greylist_addr pool addr) net.pool

let greylist_peer net peer_id =
  Option.iter (fun pool -> P2p_pool.greylist_peer pool peer_id) net.pool

let watcher net = Lwt_watcher.create_stream net.watcher
back to top