swh:1:snp:505c374fd75bb208ae4e9a54e64bb310bc49295e
Raw File
Tip revision: c3d3ea134734d68ea7598606bddad64f21d1a6c2 authored by ovidiu deac on 19 August 2022, 07:28:50 UTC
use ppx_deriving.show to generate pp_values_op
Tip revision: c3d3ea1
p2p_conn.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2019-2021 Nomadic Labs, <contact@nomadic-labs.com>          *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

module Events = P2p_events.P2p_conn

type ('msg, 'peer, 'conn) t = {
  canceler : Lwt_canceler.t;
  greylister : unit -> unit;
  messages : (int * 'msg) Lwt_pipe.Maybe_bounded.t;
  conn : ('msg P2p_message.t, 'conn) P2p_socket.t;
  peer_info : (('msg, 'peer, 'conn) t, 'peer, 'conn) P2p_peer_state.Info.t;
  point_info : ('msg, 'peer, 'conn) t P2p_point_state.Info.t option;
  negotiated_version : Network_version.t;
  mutable last_sent_swap_request : (Time.System.t * P2p_peer.Id.t) option;
  mutable wait_close : bool;
  mutable worker : unit Lwt.t option;
  peer_id : P2p_peer.Id.t;
  trusted_node : bool;
  private_node : bool;
}

let rec worker_loop (t : ('msg, 'peer, 'conn) t) callback =
  let open Lwt_syntax in
  let open P2p_answerer in
  let request_info =
    P2p_answerer.{last_sent_swap_request = t.last_sent_swap_request}
  in
  let* () = Lwt.pause () in
  let* r = protect ~canceler:t.canceler (fun () -> P2p_socket.read t.conn) in
  match r with
  | Ok (_, Bootstrap) -> (
      let* r = callback.bootstrap request_info in
      match r with
      | Ok () -> worker_loop t callback
      | Error _ -> Error_monad.cancel_with_exceptions t.canceler)
  | Ok (_, Advertise points) ->
      let* () = callback.advertise request_info points in
      worker_loop t callback
  | Ok (_, Swap_request (point, peer)) ->
      let* () = callback.swap_request request_info point peer in
      worker_loop t callback
  | Ok (_, Swap_ack (point, peer)) ->
      let* () = callback.swap_ack request_info point peer in
      worker_loop t callback
  | Ok (size, Message msg) ->
      let* () = callback.message request_info size msg in
      worker_loop t callback
  | Ok (_, Disconnect) | Error (P2p_errors.Connection_closed :: _) ->
      Error_monad.cancel_with_exceptions t.canceler
  | Error (P2p_errors.Decoding_error _ :: _) ->
      t.greylister () ;
      Error_monad.cancel_with_exceptions t.canceler
  | Error (Canceled :: _) -> Lwt.return_unit
  | Error err ->
      let* () = Events.(emit unexpected_error) err in
      Error_monad.cancel_with_exceptions t.canceler

let shutdown t =
  let open Lwt_syntax in
  match t.worker with
  | None -> Lwt.return_unit
  | Some w ->
      let* () = Error_monad.cancel_with_exceptions t.canceler in
      w

let write_swap_ack t point peer_id =
  P2p_socket.write_now t.conn (Swap_ack (point, peer_id))

let write_advertise t points = P2p_socket.write_now t.conn (Advertise points)

let create ~conn ~point_info ~peer_info ~messages ~canceler ~greylister
    ~callback negotiated_version =
  let private_node = P2p_socket.private_node conn in
  let trusted_node =
    P2p_peer_state.Info.trusted peer_info
    || Option.fold ~none:false ~some:P2p_point_state.Info.trusted point_info
  in
  let peer_id = peer_info |> P2p_peer_state.Info.peer_id in
  let t =
    {
      conn;
      point_info;
      peer_info;
      messages;
      canceler;
      greylister;
      wait_close = false;
      last_sent_swap_request = None;
      negotiated_version;
      worker = None;
      peer_id;
      private_node;
      trusted_node;
    }
  in
  let conn_info =
    P2p_answerer.
      {
        peer_id = t.peer_info |> P2p_peer_state.Info.peer_id;
        is_private = P2p_socket.private_node t.conn;
        write_advertise = write_advertise t;
        write_swap_ack = write_swap_ack t;
        messages;
      }
  in
  t.worker <-
    Some
      (Lwt_utils.worker
         "answerer"
         ~on_event:Internal_event.Lwt_worker_event.on_event
         ~run:(fun () -> worker_loop t (callback conn_info))
         ~cancel:(fun () -> Error_monad.cancel_with_exceptions t.canceler)) ;
  t

let pipe_exn_handler = function
  | Lwt_pipe.Closed -> Lwt_result_syntax.tzfail P2p_errors.Connection_closed
  | exc -> Lwt.fail exc

(* see [Lwt_pipe.Maybe_bounded.pop] *)

let read t =
  let open Lwt_syntax in
  Lwt.catch
    (fun () ->
      let* s, msg = Lwt_pipe.Maybe_bounded.pop t.messages in
      let* () =
        Events.(emit bytes_popped_from_queue)
          (s, (P2p_socket.info t.conn).peer_id)
      in
      return_ok msg)
    pipe_exn_handler

let is_readable t =
  let open Lwt_result_syntax in
  Lwt.catch
    (fun () ->
      let*! _ = Lwt_pipe.Maybe_bounded.peek t.messages in
      return_unit)
    pipe_exn_handler

let write t msg = P2p_socket.write t.conn (Message msg)

let write_sync t msg = P2p_socket.write_sync t.conn (Message msg)

let write_now t msg = P2p_socket.write_now t.conn (Message msg)

let write_swap_request t point peer_id =
  t.last_sent_swap_request <- Some (Time.System.now (), peer_id) ;
  P2p_socket.write_now t.conn (Swap_request (point, peer_id))

let write_bootstrap t = P2p_socket.write_now t.conn Bootstrap

let stat t = P2p_socket.stat t.conn

let info t = P2p_socket.info t.conn

let local_metadata t = P2p_socket.local_metadata t.conn

let remote_metadata t = P2p_socket.remote_metadata t.conn

let disconnect ?(wait = false) t =
  t.wait_close <- wait ;
  shutdown t

let close t = P2p_socket.close ~wait:t.wait_close t.conn

let equal_sock t t' = P2p_socket.equal t.conn t'.conn

let private_node t = t.private_node

let peer_id t = t.peer_id

let trusted_node t = t.trusted_node

let negotiated_version t = t.negotiated_version

module Internal_for_tests = struct
  let raw_write_sync t buf =
    P2p_socket.Internal_for_tests.raw_write_sync t.conn buf
end
back to top