https://gitlab.com/tezos/tezos
Raw File
Tip revision: c09094fbbbe85b89dc825ccd28bcb9ebc30ab10a authored by Gauthier SEBILLE on 23 May 2023, 14:27:30 UTC
DAC: naive implementation of Certificate_repr
Tip revision: c09094f
RPC_server.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2022-2023 Trili Tech <contact@trili.tech>                   *)
(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com>                *)
(* Copyright (c) 2023 Marigold  <contact@marigold.dev>                      *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Tezos_rpc_http
open Tezos_rpc_http_server

type error +=
  | Cannot_construct_external_message
  | Cannot_deserialize_external_message
  | DAC_node_not_ready of string

let () =
  register_error_kind
    `Permanent
    ~id:"dac_cannot_construct_external_message"
    ~title:"External rollup message could not be constructed"
    ~description:"External rollup message could not be constructed"
    ~pp:(fun ppf () ->
      Format.fprintf ppf "External rollup message could not be constructed")
    Data_encoding.unit
    (function Cannot_construct_external_message -> Some () | _ -> None)
    (fun () -> Cannot_construct_external_message) ;
  register_error_kind
    `Permanent
    ~id:"dac_cannot_deserialize_rollup_external_message"
    ~title:"External rollup message could not be deserialized"
    ~description:"External rollup message could not be deserialized"
    ~pp:(fun ppf () ->
      Format.fprintf ppf "External rollup message could not be deserialized")
    Data_encoding.unit
    (function Cannot_deserialize_external_message -> Some () | _ -> None)
    (fun () -> Cannot_deserialize_external_message) ;
  register_error_kind
    `Permanent
    ~id:"dac_node_not_ready"
    ~title:"DAC Node is not ready"
    ~description:
      "RPC server of DAC node is not started and plugin is not resolved."
    ~pp:(fun ppf message ->
      Format.fprintf ppf "DAC Node is not ready, current status is: %s" message)
    Data_encoding.(obj1 (req "value" string))
    (function DAC_node_not_ready message -> Some message | _ -> None)
    (fun message -> DAC_node_not_ready message)

let add_service registerer service handler directory =
  registerer directory service handler

let handle_get_health_live node_ctxt =
  match Node_context.get_status node_ctxt with
  | Ready _ | Starting -> Lwt_result_syntax.return true

let handle_get_health_ready node_ctxt =
  match Node_context.get_status node_ctxt with
  | Ready _ -> Lwt_result_syntax.return true
  | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting"

let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store
    hash_streamer (data, pagination_scheme) =
  let open Lwt_result_syntax in
  let open Pages_encoding in
  let* root_hash =
    match pagination_scheme with
    | Pagination_scheme.Merkle_tree_V0 ->
        (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897
           Once new "PUT /preimage" endpoint is implemented, pushing
           a new root hash to the data streamer should be moved there.
           Tezt for testing streaming of root hashes should also use
           the new endpoint. *)
        let* root_hash =
          Merkle_tree.V0.Filesystem.serialize_payload
            dac_plugin
            ~page_store
            data
        in
        let () =
          Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash)
        in
        let*! () =
          Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash
        in
        return root_hash
    | Pagination_scheme.Hash_chain_V0 ->
        Hash_chain.V0.serialize_payload
          dac_plugin
          ~for_each_page:(fun (hash, content) ->
            Page_store.Filesystem.save dac_plugin page_store ~hash ~content)
          data
  in
  let* signature, witnesses =
    Signature_manager.Legacy.sign_root_hash
      dac_plugin
      cctxt
      dac_sk_uris
      root_hash
  in
  let raw_root_hash = Dac_plugin.hash_to_raw root_hash in
  let*! external_message =
    External_message.Default.make dac_plugin root_hash signature witnesses
  in
  match external_message with
  | Ok external_message -> return @@ (raw_root_hash, external_message)
  | Error _ -> tzfail @@ Cannot_construct_external_message

let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message =
  let open Lwt_result_syntax in
  let ((module Plugin) : Dac_plugin.t) = dac_plugin in
  let external_message =
    let open Option_syntax in
    let* encoded_l1_message in
    let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in
    External_message.Default.of_bytes Plugin.encoding as_bytes
  in
  match external_message with
  | None -> tzfail @@ Cannot_deserialize_external_message
  | Some {root_hash; signature; witnesses} ->
      Signature_manager.verify
        dac_plugin
        ~public_keys_opt
        (Dac_plugin.hash_to_raw root_hash)
        signature
        witnesses

(** [handle_get_preimage] is shared by both [V0] and [V1] API. *)
let handle_get_preimage dac_plugin page_store raw_hash =
  let open Lwt_result_syntax in
  let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in
  Page_store.Filesystem.load dac_plugin page_store hash

(* Handler for subscribing to the streaming of root hashes via
   GET monitor/root_hashes RPC call. *)
let handle_monitor_root_hashes hash_streamer =
  let open Lwt_syntax in
  let stream, stopper = Data_streamer.handle_subscribe hash_streamer in
  let shutdown () = Lwt_watcher.shutdown stopper in
  let next () = Lwt_stream.get stream in
  let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in
  Tezos_rpc.Answer.return_stream {next; shutdown}

let handle_get_certificate dac_plugin node_store raw_root_hash =
  let open Lwt_result_syntax in
  let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in

  let+ value_opt = Store.Certificate_store.find node_store root_hash in
  Option.map
    (fun Store.{aggregate_signature; witnesses} ->
      Certificate_repr.(
        V0 (V0.make raw_root_hash aggregate_signature witnesses)))
    value_opt

let handle_get_missing_page cctxt page_store dac_plugin raw_root_hash =
  let open Lwt_result_syntax in
  let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in
  let remote_store = Page_store.Remote.(init {cctxt; page_store}) in
  let* preimage =
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/5142
        Retrieve missing page from dac committee via "flooding". *)
    Page_store.Remote.load dac_plugin remote_store root_hash
  in
  let*! () = Event.(emit fetched_missing_page raw_root_hash) in
  return preimage

let register_get_health_live cctxt directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       RPC_services.get_health_live
       (fun () () -> handle_get_health_live cctxt)

let register_get_health_ready cctxt directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       RPC_services.get_health_ready
       (fun () () -> handle_get_health_ready cctxt)

let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer
    directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       RPC_services.V0.post_store_preimage
       (fun () input ->
         handle_post_store_preimage
           ctx
           cctxt
           dac_sk_uris
           page_store
           hash_streamer
           input)

let register_get_verify_signature dac_plugin public_keys_opt directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       RPC_services.V0.get_verify_signature
       (fun external_message () ->
         handle_get_verify_signature dac_plugin public_keys_opt external_message)

let register_get_preimage dac_plugin page_store =
  add_service
    Tezos_rpc.Directory.register1
    RPC_services.V0.get_preimage
    (fun hash () () -> handle_get_preimage dac_plugin page_store hash)

let register_monitor_root_hashes hash_streamer dir =
  Tezos_rpc.Directory.gen_register
    dir
    Monitor_services.V0.S.root_hashes
    (fun () () () -> handle_monitor_root_hashes hash_streamer)

let register_get_certificate node_store dac_plugin =
  add_service
    Tezos_rpc.Directory.register1
    RPC_services.V0.get_certificate
    (fun root_hash () () ->
      handle_get_certificate dac_plugin node_store root_hash)

let register_get_missing_page dac_plugin page_store cctxt =
  add_service
    Tezos_rpc.Directory.register1
    RPC_services.V0.get_missing_page
    (fun root_hash () () ->
      handle_get_missing_page cctxt page_store dac_plugin root_hash)

let register_get_pages dac_plugin page_store =
  add_service
    Tezos_rpc.Directory.register1
    RPC_services.V1.get_pages
    (fun hash () () -> handle_get_preimage dac_plugin page_store hash)

module Coordinator = struct
  let handle_post_preimage dac_plugin page_store hash_streamer payload =
    let open Lwt_result_syntax in
    let* root_hash =
      Pages_encoding.Merkle_tree.V0.Filesystem.serialize_payload
        dac_plugin
        ~page_store
        payload
    in
    let () =
      Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash)
    in
    let*! () =
      Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash
    in
    return @@ Dac_plugin.hash_to_raw root_hash

  let handle_monitor_certificate dac_plugin ro_node_store certificate_streamers
      raw_root_hash committee_members =
    let open Lwt_result_syntax in
    let*? stream, stopper =
      Certificate_streamers.handle_subscribe
        dac_plugin
        certificate_streamers
        raw_root_hash
    in
    let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in
    let*! () = Event.emit_new_subscription_to_certificate_updates root_hash in
    let shutdown () = Lwt_watcher.shutdown stopper in
    let next () = Lwt_stream.get stream in
    (* Add the current certificate to the streamer, if any, to ensure that
       a certificate is returned even in the case that no updates to the
       certificate happen for a long time. *)
    let*! current_certificate_store_value_res =
      Store.Certificate_store.find ro_node_store root_hash
    in
    match current_certificate_store_value_res with
    | Ok current_certificate_store_value ->
        let () =
          Option.iter
            (fun Store.{aggregate_signature; witnesses} ->
              let certificate =
                Certificate_repr.(
                  V0 (V0.make raw_root_hash aggregate_signature witnesses))
              in
              let _ =
                Certificate_streamers.push
                  dac_plugin
                  certificate_streamers
                  raw_root_hash
                  certificate
              in
              if
                Certificate_repr.all_committee_members_have_signed
                  committee_members
                  certificate
              then
                let _ =
                  Certificate_streamers.close
                    dac_plugin
                    certificate_streamers
                    raw_root_hash
                in
                ()
              else ())
            current_certificate_store_value
        in
        return (next, shutdown)
    | Error e -> fail e

  let register_monitor_certificate dac_plugin ro_node_store
      certificate_streamers committee_members dir =
    Tezos_rpc.Directory.gen_register
      dir
      Monitor_services.V0.S.certificate
      (fun ((), root_hash) () () ->
        let open Lwt_result_syntax in
        let*! handler =
          handle_monitor_certificate
            dac_plugin
            ro_node_store
            certificate_streamers
            root_hash
            committee_members
        in
        match handler with
        | Ok (next, shutdown) -> Tezos_rpc.Answer.return_stream {next; shutdown}
        | Error e -> Tezos_rpc.Answer.fail e)

  let register_post_preimage dac_plugin hash_streamer page_store =
    add_service
      Tezos_rpc.Directory.register0
      RPC_services.V0.Coordinator.post_preimage
      (fun () payload ->
        handle_post_preimage dac_plugin page_store hash_streamer payload)

  let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store
      =
    add_service
      Tezos_rpc.Directory.register0
      RPC_services.V0.put_dac_member_signature
      (fun () dac_member_signature ->
        Signature_manager.Coordinator.handle_put_dac_member_signature
          ctx
          dac_plugin
          rw_node_store
          page_store
          dac_member_signature)

  let dynamic_rpc_dir dac_plugin rw_store page_store coordinator_node_ctxt =
    let hash_streamer =
      coordinator_node_ctxt.Node_context.Coordinator.hash_streamer
    in
    let certificate_streamers = coordinator_node_ctxt.certificate_streamers in
    let committee_members = coordinator_node_ctxt.committee_members in
    Tezos_rpc.Directory.empty
    |> register_post_preimage dac_plugin hash_streamer page_store
    |> register_get_preimage dac_plugin page_store
    |> register_monitor_root_hashes hash_streamer
    |> register_monitor_certificate
         dac_plugin
         rw_store
         certificate_streamers
         committee_members
    |> register_put_dac_member_signature
         coordinator_node_ctxt
         dac_plugin
         rw_store
         page_store
    |> register_get_certificate rw_store dac_plugin
    |> register_get_pages dac_plugin page_store
end

module Committee_member = struct
  let dynamic_rpc_dir dac_plugin page_store =
    Tezos_rpc.Directory.empty
    |> register_get_preimage dac_plugin page_store
    |> register_get_pages dac_plugin page_store
end

module Observer = struct
  let dynamic_rpc_dir dac_plugin coordinator_cctxt page_store =
    Tezos_rpc.Directory.empty
    |> register_get_preimage dac_plugin page_store
    |> register_get_missing_page dac_plugin page_store coordinator_cctxt
    |> register_get_pages dac_plugin page_store
end

module Legacy = struct
  let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store
      =
    add_service
      Tezos_rpc.Directory.register0
      RPC_services.V0.put_dac_member_signature
      (fun () dac_member_signature ->
        Signature_manager.Legacy.handle_put_dac_member_signature
          ctx
          dac_plugin
          rw_node_store
          page_store
          dac_member_signature)

  let dynamic_rpc_dir dac_plugin rw_store page_store cctxt legacy_node_ctxt =
    let hash_streamer = legacy_node_ctxt.Node_context.Legacy.hash_streamer in
    let public_keys_opt =
      Node_context.Legacy.public_keys_opt legacy_node_ctxt
    in
    let secret_key_uris_opt =
      Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt
    in
    let register_get_missing_page =
      match legacy_node_ctxt.coordinator_cctxt with
      | None -> fun dir -> dir
      | Some cctxt ->
          fun dir ->
            dir |> register_get_missing_page dac_plugin page_store cctxt
    in
    Tezos_rpc.Directory.empty
    |> register_post_store_preimage
         dac_plugin
         cctxt
         secret_key_uris_opt
         page_store
         hash_streamer
    |> register_get_verify_signature dac_plugin public_keys_opt
    |> register_get_preimage dac_plugin page_store
    |> register_monitor_root_hashes hash_streamer
    |> register_put_dac_member_signature
         legacy_node_ctxt
         dac_plugin
         rw_store
         page_store
    |> register_get_certificate rw_store dac_plugin
    |> register_get_missing_page
end

let start ~rpc_address ~rpc_port node_ctxt =
  let open Lwt_syntax in
  let rw_store = Node_context.get_node_store node_ctxt Store_sigs.Read_write in
  let page_store = Node_context.get_page_store node_ctxt in
  let cctxt = Node_context.get_tezos_node_cctxt node_ctxt in
  let register_dynamic_rpc dac_plugin =
    match Node_context.get_mode node_ctxt with
    | Coordinator coordinator_node_ctxt ->
        Coordinator.dynamic_rpc_dir
          dac_plugin
          rw_store
          page_store
          coordinator_node_ctxt
    | Committee_member _committee_member_node_ctxt ->
        Committee_member.dynamic_rpc_dir dac_plugin page_store
    | Observer {coordinator_cctxt; _} ->
        Observer.dynamic_rpc_dir dac_plugin coordinator_cctxt page_store
    | Legacy legacy_node_ctxt ->
        Legacy.dynamic_rpc_dir
          dac_plugin
          rw_store
          page_store
          cctxt
          legacy_node_ctxt
  in
  let register_health_endpoints dir =
    dir
    |> register_get_health_ready node_ctxt
    |> register_get_health_live node_ctxt
  in
  let dir =
    Tezos_rpc.Directory.register_dynamic_directory
      Tezos_rpc.Directory.empty
      Tezos_rpc.Path.open_root
      (fun () ->
        match Node_context.get_status node_ctxt with
        | Ready {dac_plugin = (module Dac_plugin)} ->
            Lwt.return
              (register_dynamic_rpc (module Dac_plugin)
              |> register_health_endpoints)
        | Starting ->
            Lwt.return (Tezos_rpc.Directory.empty |> register_health_endpoints))
  in
  let rpc_address = P2p_addr.of_string_exn rpc_address in
  let host = Ipaddr.V6.to_string rpc_address in
  let node = `TCP (`Port rpc_port) in
  (* TODO: https://gitlab.com/tezos/tezos/-/issues/5613
     Check if proper ACL handling is needed. *)
  let acl = RPC_server.Acl.allow_all in
  let server =
    RPC_server.init_server dir ~acl ~media_types:Media_type.all_media_types
  in
  Lwt.catch
    (fun () ->
      let* () =
        RPC_server.launch
          ~host
          server
          ~callback:(RPC_server.resto_callback server)
          node
      in
      let* () = Event.emit_rpc_started () in
      return_ok server)
    fail_with_exn

let shutdown = RPC_server.shutdown

let install_finalizer rpc_server =
  let open Lwt_syntax in
  Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status ->
  let* () = shutdown rpc_server in
  let* () = Event.(emit shutdown_node exit_status) in
  Tezos_base_unix.Internal_event_unix.close ()
back to top