p2p_protocol.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2021 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
module Events = P2p_events.P2p_protocol
type ('msg, 'peer, 'conn) config = {
swap_linger : Time.System.Span.t option;
pool : ('msg, 'peer, 'conn) P2p_pool.t;
log : P2p_connection.P2p_event.t -> unit;
connect : P2p_point.Id.t -> ('msg, 'peer, 'conn) P2p_conn.t tzresult Lwt.t;
mutable latest_accepted_swap : Time.System.t;
mutable latest_successful_swap : Time.System.t;
}
open P2p_answerer
let message conn _request size msg =
Lwt_pipe.Maybe_bounded.push conn.messages (size, msg)
module Private_answerer = struct
let advertise conn _request _points =
Prometheus.Counter.inc_one P2p_metrics.Messages.advertise_received ;
Events.(emit private_node_new_peers) conn.peer_id
let bootstrap conn _request =
Prometheus.Counter.inc_one P2p_metrics.Messages.bootstrap_received ;
Lwt_result.ok @@ Events.(emit private_node_peers_request) conn.peer_id
let swap_request conn _request _new_point _peer =
Prometheus.Counter.inc_one P2p_metrics.Messages.swap_request_received ;
Events.(emit private_node_swap_request) conn.peer_id
let swap_ack conn _request _point _peer_id =
Prometheus.Counter.inc_one P2p_metrics.Messages.swap_ack_received ;
Events.(emit private_node_swap_ack) conn.peer_id
let create conn =
P2p_answerer.
{
message = message conn;
advertise = advertise conn;
bootstrap = bootstrap conn;
swap_request = swap_request conn;
swap_ack = swap_ack conn;
}
end
module Default_answerer = struct
open P2p_connection.P2p_event
let advertise config conn _request points =
let log = config.log in
let source_peer_id = conn.peer_id in
log (Advertise_received {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.advertise_received ;
P2p_pool.register_list_of_new_points
~medium:"advertise"
~source:conn.peer_id
config.pool
points
let bootstrap config conn _request_info =
let open Lwt_result_syntax in
let log = config.log in
let source_peer_id = conn.peer_id in
log (Bootstrap_received {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.bootstrap_received ;
if conn.is_private then
let*! () = Events.(emit private_node_request) conn.peer_id in
return_unit
else
let*! points =
P2p_pool.list_known_points ~ignore_private:true config.pool
in
match points with
| [] -> return_unit
| points -> (
match conn.write_advertise points with
| Ok true ->
log (Advertise_sent {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.advertise_sent ;
return_unit
| Ok false ->
(* TODO: https://gitlab.com/tezos/tezos/-/issues/4594
if not sent then ?? TODO count dropped message ?? *)
return_unit
| Error err as error ->
let*! () =
Events.(emit advertise_sending_failed) (source_peer_id, err)
in
Lwt.return error)
let swap t pool source_peer_id ~connect current_peer_id new_point =
let open Lwt_syntax in
t.latest_accepted_swap <- Time.System.now () ;
let* r = connect new_point in
match r with
| Ok _new_conn -> (
t.latest_successful_swap <- Time.System.now () ;
t.log (Swap_success {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.success ;
let* () = Events.(emit swap_succeeded) new_point in
match P2p_pool.Connection.find_by_peer_id pool current_peer_id with
| None -> return_unit
| Some conn -> P2p_conn.disconnect conn)
| Error err -> (
t.latest_accepted_swap <- t.latest_successful_swap ;
t.log (Swap_failure {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.fail ;
match err with
| [Timeout] -> Events.(emit swap_interrupted) (new_point, err)
| _ -> Events.(emit swap_failed) (new_point, err))
let swap_ack config conn request new_point _peer =
let open Lwt_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let connect = config.connect in
let log = config.log in
log (Swap_ack_received {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.swap_ack_received ;
let do_swap =
let open Option_syntax in
let* _ = config.swap_linger (* Don't answer if swap is disabled. *) in
let* _time, proposed_peer_id =
(* Checks that a swap request has been sent to this connection. *)
request.last_sent_swap_request
in
P2p_pool.Connection.find_by_point pool new_point
(* Ignore the swap if the new point is already connected *)
(* FIXME: https://gitlab.com/tezos/tezos/-/issues/5211
Should we accept the swap if we are not yet connected
to the node, but already in the process of connecting to
it? This can raise race conditions. *)
|> Option.fold_f
~some:(fun _ -> None)
~none:(fun () ->
Some
(swap
config
pool
source_peer_id
~connect
proposed_peer_id
new_point))
in
Option.value ~default:return_unit do_swap
let swap_request config conn _request new_point _peer =
let open Result_syntax in
let source_peer_id = conn.peer_id in
let pool = config.pool in
let connect = config.connect in
let log = config.log in
log (Swap_request_received {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.swap_request_received ;
let do_swap =
let* swap_linger =
config.swap_linger |> Option.to_result ~none:`Swap_is_disabled
in
(* Ignore if already connected to peer or already swapped less than <swap_linger> ago. *)
let span_since_last_swap =
Ptime.diff
(Time.System.now ())
(Time.System.max
config.latest_successful_swap
config.latest_accepted_swap)
in
(* We don't need to register the point here.
Registering the point is the responsibility of the
[P2p_connect_handler]. Registering the point will
eventually be done in [P2p_connect_handler.connect].
Moreover, registering the point here could lead to
chaotic interactions with the maintainance. *)
let new_point_info = P2p_pool.Points.info pool new_point in
if
Ptime.Span.compare span_since_last_swap swap_linger < 0
|| not
(Option.fold
~none:true
~some:(fun new_point_info ->
P2p_point_state.is_disconnected new_point_info)
new_point_info)
then (
log (Swap_request_ignored {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Swap.ignored ;
return @@ Events.(emit swap_request_ignored) source_peer_id)
else
let* source_conn =
P2p_pool.Connection.find_by_peer_id pool source_peer_id
|> Option.to_result ~none:`Couldnt_find_by_peer
in
let* proposed_point, proposed_peer_id =
P2p_pool.Connection.random_addr
pool
~different_than:source_conn
~no_private:true
|> Option.to_result ~none:(`No_swap_candidate source_peer_id)
in
let* () =
let* sent_succeeded =
conn.write_swap_ack proposed_point proposed_peer_id
|> Result.map_error (fun _ ->
`Couldnt_write_swap_ack "Connection closed")
in
if sent_succeeded then return_unit
else
fail @@ `Couldnt_write_swap_ack "Buffer is full. Message dropped."
in
log (Swap_ack_sent {source = source_peer_id}) ;
Prometheus.Counter.inc_one P2p_metrics.Messages.swap_ack_sent ;
return
@@ swap config pool source_peer_id ~connect proposed_peer_id new_point
in
(* TODO: https://gitlab.com/tezos/tezos/-/issues/5187
Handle silently ignored error cases. *)
Result.fold
~ok:Fun.id
~error:(function
| `No_swap_candidate source_peer_id ->
Events.(emit no_swap_candidate) source_peer_id
| `Couldnt_find_by_peer
(* The connection has been lost so ignore the request *)
| `Swap_is_disabled | `Couldnt_write_swap_ack _ ->
Lwt.return_unit)
do_swap
let create config conn =
P2p_answerer.
{
message = message conn;
advertise = advertise config conn;
bootstrap = bootstrap config conn;
swap_request = swap_request config conn;
swap_ack = swap_ack config conn;
}
end
let create_default = Default_answerer.create
let create_private () = Private_answerer.create