Raw File
handler.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2023 Trili Tech, <contact@trili.tech>                       *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

type t = unit tzresult Lwt.t * Tezos_rpc__RPC_context.stopper

let resolve_plugin
    (protocols : Tezos_shell_services.Chain_services.Blocks.protocols) =
  let open Lwt_syntax in
  let current_protocol = protocols.current_protocol in
  let next_protocol = protocols.next_protocol in
  let plugin_opt =
    Option.either
      (Dac_plugin.get current_protocol)
      (Dac_plugin.get next_protocol)
  in
  match plugin_opt with
  | None ->
      let+ () =
        Event.emit_protocol_plugin_not_resolved current_protocol next_protocol
      in
      None
  | Some dac_plugin ->
      let (module Dac_plugin : Dac_plugin.T) = dac_plugin in
      let+ () = Event.emit_protocol_plugin_resolved Dac_plugin.Proto.hash in
      Some dac_plugin

(** [make_stream_daemon handler streamed_call] calls [handler] on each newly
      received value from [streamed_call].
      It returns a couple [(p, stopper)] where [p] is a promise resolving when
      the stream closes and [stopper] a function closing the stream.
  *)
let make_stream_daemon handle streamed_call =
  let open Lwt_result_syntax in
  let* stream, stopper = streamed_call in
  let rec go () =
    let*! tok = Lwt_stream.get stream in
    match tok with
    | None -> return_unit
    | Some element ->
        let*! r = handle stopper element in
        let*! () =
          match r with
          | Ok () -> Lwt.return_unit
          | Error trace ->
              let*! () = Event.(emit daemon_error) trace in
              Lwt.return_unit
        in
        go ()
  in
  return (go (), stopper)

(* TODO: https://gitlab.com/tezos/tezos/-/issues/5930
         Dac nodes operators should be able to configure
         [infinite_daemon_init_delay] from the command line. *)

(** [infinite_daemon_init_delay] represents a delay before trying to
    re-establish connection in case of streamed daemon disconnection. *)
let infinite_daemon_init_delay = 2.

(** [infinite_daemon_max_delay] represents a max delay before trying to
    re-establish connection in case of streamed daemon disconnection. *)
let infinite_daemon_max_delay = 128.

(** [make_infinite_stream_daemon ~on_disconnect ~on_failed_connection connect]
    creates an ever lasting streamed daemon, by restarting a daemon,
    every time connection is lost or connection fails to be established.

    In case of a lost connection, we first wait [infinite_daemon_init_delay]
    until trying to run the streamed daemon again. If connection is not
    established we duplicate the waiting time. The waiting time is bounded by
    [infinite_daemon_max_delay].

    - [connect] is a streamed daemon constructor.
    - [~on_disconnect] is used to emit event when the daemon disconnects.
    - [~on_failed_connection] is used to emit event when unable to re-establish
      connection. 

      TODO: https://gitlab.com/tezos/tezos/-/issues/5931
            We would want an upper bound in [max_retries] for this function.
            Both [max_retries] and [infinite_daemon_max_delay] would ideally
            be configurable. *)
let make_infinite_stream_daemon ~on_disconnect ~on_failed_connection connect =
  let rec loop ~delay ~count =
    let open Lwt_result_syntax in
    let*! daemon = connect () in
    match daemon with
    | Ok (daemon, stopper) ->
        (* [daemon] promise is resolved when underlying stream closes. E.g.
           this happens when rebooting Coordinator's node. *)
        let* () = daemon in
        let () = stopper () in
        let*! () = on_disconnect () in
        (* Before reconnecting we wait. *)
        let*! () = Lwt_unix.sleep delay in
        loop ~count:0 ~delay:infinite_daemon_init_delay
    | Error e ->
        let*! () = on_failed_connection ~count ~delay e in
        (* Before trying again we wait. *)
        let*! () = Lwt_unix.sleep delay in
        (* We duplicate the previous waiting time which is bounded by
           [infinite_daemon_max_delay]. *)
        let delay = Float.min (delay *. 2.0) infinite_daemon_max_delay in
        loop ~count:(count + 1) ~delay
  in
  loop ~count:0 ~delay:infinite_daemon_init_delay

let resolve_plugin_and_set_ready ctxt =
  (* Monitor heads and try resolve the DAC protocol plugin corresponding to
     the protocol of the targeted node. *)
  (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605
     Handle situtation where plugin is not found *)
  let open Lwt_result_syntax in
  let cctxt = Node_context.get_tezos_node_cctxt ctxt in
  let handler stopper (_block_hash, (_block_header : Tezos_base.Block_header.t))
      =
    let* protocols =
      Tezos_shell_services.Chain_services.Blocks.protocols cctxt ()
    in
    let*! dac_plugin = resolve_plugin protocols in
    match dac_plugin with
    | Some dac_plugin ->
        Node_context.set_ready ctxt dac_plugin ;
        let*! () = Event.(emit node_is_ready ()) in
        stopper () ;
        return_unit
    | None -> return_unit
  in
  let handler stopper el =
    match Node_context.get_status ctxt with
    | Starting -> handler stopper el
    | Ready _ -> return_unit
  in
  let*! () = Event.(emit layer1_node_tracking_started ()) in
  make_stream_daemon
    handler
    (Tezos_shell_services.Monitor_services.heads cctxt `Main)

(** The [new_head] handler is shared by all operating modes.  This handler is
    responsible for tracking new heads from the Layer 1. *)
let new_head ctxt =
  let cctxt = Node_context.get_tezos_node_cctxt ctxt in
  let open Lwt_result_syntax in
  let handler _stopper (block_hash, (header : Tezos_base.Block_header.t)) =
    match Node_context.get_status ctxt with
    | Starting -> return_unit
    | Ready _ ->
        let block_level = header.shell.level in
        let*! () =
          Event.(emit layer1_node_new_head (block_hash, block_level))
        in
        return_unit
  in
  let*! () = Event.(emit layer1_node_tracking_started ()) in
  make_infinite_stream_daemon
    ~on_disconnect:Event.emit_l1_tracking_ended
    ~on_failed_connection:Event.cannot_connect_to_tezos_node
    (fun () ->
      make_stream_daemon
        handler
        (Tezos_shell_services.Monitor_services.heads cctxt `Main))

(** Handlers specific to a [Committee_member]. A [Committee_member] is
    responsible for
    {ul
      {li Monitoring root hashes from a [Coordinator],}
      {li Downloading the associated pages,}
      {li Validating the hash of each page,}
      {li Sign the final root hash with the public key of the
      committee member,}
      {li Send the signature back to the [Coordinaotor].}
    } *)
module Committee_member = struct
  let push_payload_signature coordinator_cctxt wallet_cctxt committee_member
      root_hash =
    let open Lwt_result_syntax in
    let signer_pkh =
      committee_member.Wallet_account.Committee_member.public_key_hash
    in
    let secret_key_uri = committee_member.secret_key_uri in
    let bytes_to_sign = Dac_plugin.hash_to_bytes root_hash in
    let* signature =
      Tezos_client_base.Client_keys.aggregate_sign
        wallet_cctxt
        secret_key_uri
        bytes_to_sign
    in
    let signature_repr =
      Signature_repr.make
        (Dac_plugin.hash_to_raw root_hash)
        signature
        signer_pkh
    in
    let* () =
      (* TODO: https://gitlab.com/tezos/tezos/-/issues/5627
         Currently we have only one major DAC API version ([V0]). For this
         reason, we can always default to it. This should be revisited once we
         add another major version. *)
      Dac_node_client.V0.put_dac_member_signature
        coordinator_cctxt
        ~signature:signature_repr
    in
    let*! () = Event.emit_signature_pushed_to_coordinator signature in
    return_unit

  let new_root_hash ctxt wallet_cctxt dac_plugin page_store =
    let open Lwt_result_syntax in
    let coordinator_cctxt =
      ctxt.Node_context.Committee_member.coordinator_cctxt
    in
    let handler dac_plugin remote_store _stopper root_hash =
      let*? root_hash = Dac_plugin.raw_to_hash dac_plugin root_hash in
      let*! () = Event.emit_new_root_hash_received dac_plugin root_hash in
      let*! payload_result =
        Pages_encoding.Merkle_tree.V0.Remote.deserialize_payload
          dac_plugin
          ~page_store:remote_store
          root_hash
      in
      match payload_result with
      | Ok _ ->
          let*! () =
            Event.emit_received_root_hash_processed dac_plugin root_hash
          in
          let committee_member =
            ctxt.Node_context.Committee_member.committee_member
          in
          push_payload_signature
            coordinator_cctxt
            wallet_cctxt
            committee_member
            root_hash
      | Error errs ->
          (* TODO: https://gitlab.com/tezos/tezos/-/issues/4930.
              Improve handling of errors. *)
          let*! () =
            Event.emit_processing_root_hash_failed dac_plugin root_hash errs
          in
          return_unit
    in
    let remote_store =
      Page_store.(Remote.init {cctxt = coordinator_cctxt; page_store})
    in
    let*! () = Event.(emit subscribed_to_root_hashes_stream ()) in
    make_infinite_stream_daemon
      ~on_disconnect:Event.emit_coordinators_connection_lost
      ~on_failed_connection:Event.emit_cannot_connect_to_coordinator
      (fun () ->
        make_stream_daemon
          (handler dac_plugin remote_store)
          (Monitor_services.V0.root_hashes coordinator_cctxt))
end

(** Handlers specific to an [Observer]. An [Observer] is responsible for
    {ul
      {li Monitoring root hashes from a [Coordinator],}
      {li Downloading the associated pages,}
      {li Validating the hash of each page.}
    } *)

module Observer = struct
  let new_root_hash ctxt dac_plugin page_store =
    let open Lwt_result_syntax in
    let coordinator_cctxt = ctxt.Node_context.Observer.coordinator_cctxt in
    let handler dac_plugin remote_store _stopper root_hash =
      let*? root_hash = Dac_plugin.raw_to_hash dac_plugin root_hash in
      let*! () = Event.emit_new_root_hash_received dac_plugin root_hash in
      let*! payload_result =
        Pages_encoding.Merkle_tree.V0.Remote.deserialize_payload
          dac_plugin
          ~page_store:remote_store
          root_hash
      in
      match payload_result with
      | Ok _ ->
          let*! () =
            Event.emit_received_root_hash_processed dac_plugin root_hash
          in
          return_unit
      | Error errs ->
          (* TODO: https://gitlab.com/tezos/tezos/-/issues/4930.
             Improve handling of errors. *)
          let*! () =
            Event.emit_processing_root_hash_failed dac_plugin root_hash errs
          in
          return_unit
    in
    let remote_store =
      Page_store.(Remote.init {cctxt = coordinator_cctxt; page_store})
    in
    let*! () = Event.(emit subscribed_to_root_hashes_stream ()) in
    make_infinite_stream_daemon
      ~on_disconnect:Event.emit_coordinators_connection_lost
      ~on_failed_connection:Event.emit_cannot_connect_to_coordinator
      (fun () ->
        make_stream_daemon
          (handler dac_plugin remote_store)
          (Monitor_services.V0.root_hashes coordinator_cctxt))
end

let handlers node_ctxt =
  let open Lwt_result_syntax in
  let*? plugin = Node_context.get_dac_plugin node_ctxt in
  let page_store = Node_context.get_page_store node_ctxt in
  let wallet_cctxt = Node_context.get_tezos_node_cctxt node_ctxt in
  match Node_context.get_mode node_ctxt with
  | Coordinator _ -> return [new_head node_ctxt]
  | Committee_member ctxt ->
      return
        [
          new_head node_ctxt;
          Committee_member.new_root_hash ctxt wallet_cctxt plugin page_store;
        ]
  | Observer ctxt ->
      return [new_head node_ctxt; Observer.new_root_hash ctxt plugin page_store]
back to top