https://gitlab.com/tezos/tezos
Raw File
Tip revision: 9d1ae795bd468a60a0d0e3d94c27328dd56ebd48 authored by Andrea Cerone on 22 March 2023, 14:24:14 UTC
Dac/Node: emit event when page from stream is saved
Tip revision: 9d1ae79
RPC_server.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2022-2023 Trili Tech <contact@trili.tech>                   *)
(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com>                *)
(* Copyright (c) 2023 Marigold  <contact@tmarigold.dev>                      *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Tezos_rpc_http
open Tezos_rpc_http_server

type error +=
  | Cannot_construct_external_message
  | Cannot_deserialize_external_message

let () =
  register_error_kind
    `Permanent
    ~id:"dac_cannot_construct_external_message"
    ~title:"External rollup message could not be constructed"
    ~description:"External rollup message could not be constructed"
    ~pp:(fun ppf () ->
      Format.fprintf ppf "External rollup message could not be constructed")
    Data_encoding.unit
    (function Cannot_construct_external_message -> Some () | _ -> None)
    (fun () -> Cannot_construct_external_message) ;
  register_error_kind
    `Permanent
    ~id:"dac_cannot_deserialize_rollup_external_message"
    ~title:"External rollup message could not be deserialized"
    ~description:"External rollup message could not be deserialized"
    ~pp:(fun ppf () ->
      Format.fprintf ppf "External rollup message could not be deserialized")
    Data_encoding.unit
    (function Cannot_deserialize_external_message -> Some () | _ -> None)
    (fun () -> Cannot_deserialize_external_message)

let add_service registerer service handler directory =
  registerer directory service handler

let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store
    hash_streamer (data, pagination_scheme) =
  let open Lwt_result_syntax in
  let open Pages_encoding in
  let* root_hash =
    match pagination_scheme with
    | Pagination_scheme.Merkle_tree_V0 ->
        (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897
           Once new "PUT /preimage" endpoint is implemented, pushing
           a new root hash to the data streamer should be moved there.
           Tezt for testing streaming of root hashes should also use
           the new endpoint. *)
        let* root_hash =
          Merkle_tree.V0.Streaming.serialize_payload dac_plugin ~page_store data
        in
        let () = Data_streamer.publish hash_streamer root_hash in
        let*! () =
          Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash
        in
        return root_hash
    | Pagination_scheme.Hash_chain_V0 ->
        Hash_chain.V0.serialize_payload
          dac_plugin
          ~for_each_page:(fun (hash, content) ->
            Page_store.Streaming_page_store.save
              dac_plugin
              page_store
              ~hash
              ~content)
          data
  in
  let* signature, witnesses =
    Signature_manager.sign_root_hash dac_plugin cctxt dac_sk_uris root_hash
  in
  let*! external_message =
    External_message.Default.make dac_plugin root_hash signature witnesses
  in
  match external_message with
  | Ok external_message -> return @@ (root_hash, external_message)
  | Error _ -> tzfail @@ Cannot_construct_external_message

let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message =
  let open Lwt_result_syntax in
  let external_message =
    let open Option_syntax in
    let* encoded_l1_message in
    let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in
    let ((module P) : Dac_plugin.t) = dac_plugin in
    External_message.Default.of_bytes P.encoding as_bytes
  in
  match external_message with
  | None -> tzfail @@ Cannot_deserialize_external_message
  | Some {root_hash; signature; witnesses} ->
      Signature_manager.verify
        dac_plugin
        ~public_keys_opt
        root_hash
        signature
        witnesses

let handle_get_preimage dac_plugin page_store hash =
  Page_store.Filesystem.load dac_plugin page_store hash

(* Handler for subscribing to the streaming of root hashes via
   GET monitor/root_hashes RPC call. *)
let handle_monitor_root_hashes hash_streamer =
  let open Lwt_syntax in
  let stream, stopper = Data_streamer.handle_subscribe hash_streamer in
  let shutdown () = Lwt_watcher.shutdown stopper in
  let next () = Lwt_stream.get stream in
  let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in
  Tezos_rpc.Answer.return_stream {next; shutdown}

(* Handler for subscribing to the streaming of root hashes via
   GET monitor/root_hashes RPC call. *)
let handle_monitor_saved_pages hash_streamer =
  let open Lwt_syntax in
  let stream, stopper = Data_streamer.handle_subscribe hash_streamer in
  let shutdown () = Lwt_watcher.shutdown stopper in
  let next () = Lwt_stream.get stream in
  let* () = Event.(emit handle_new_subscription_to_page_streamer ()) in
  Tezos_rpc.Answer.return_stream {next; shutdown}

let handle_get_certificate ctx root_hash =
  let open Lwt_result_syntax in
  let node_store = Node_context.get_node_store ctx Store_sigs.Read_only in
  let+ value_opt = Store.Certificate_store.find node_store root_hash in
  Option.map
    (fun Store.{aggregate_signature; witnesses} ->
      Certificate_repr.{aggregate_signature; witnesses; root_hash})
    value_opt

let handle_get_missing_page cctxt page_store dac_plugin root_hash =
  let open Lwt_result_syntax in
  let remote_store = Page_store.Remote.(init {cctxt; page_store}) in
  let* preimage =
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/5142
        Retrieve missing page from dac committee via "flooding". *)
    Page_store.Remote.load dac_plugin remote_store root_hash
  in
  let*! () = Event.(emit (fetched_missing_page dac_plugin) root_hash) in
  return preimage

let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer
    directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       (RPC_services.post_store_preimage ctx)
       (fun () input ->
         handle_post_store_preimage
           ctx
           cctxt
           dac_sk_uris
           page_store
           hash_streamer
           input)

let register_get_verify_signature dac_plugin public_keys_opt directory =
  directory
  |> add_service
       Tezos_rpc.Directory.register0
       RPC_services.get_verify_signature
       (fun external_message () ->
         handle_get_verify_signature dac_plugin public_keys_opt external_message)

let register_get_preimage dac_plugin page_store =
  add_service
    Tezos_rpc.Directory.register1
    (RPC_services.get_preimage dac_plugin)
    (fun hash () () -> handle_get_preimage dac_plugin page_store hash)

let register_monitor_root_hashes dac_plugin hash_streamer dir =
  Tezos_rpc.Directory.gen_register
    dir
    (Monitor_services.S.root_hashes dac_plugin)
    (fun () () () -> handle_monitor_root_hashes hash_streamer)

let register_monitor_saved_pages dac_plugin hash_streamer dir =
  Tezos_rpc.Directory.gen_register
    dir
    (Monitor_services.S.saved_pages dac_plugin)
    (fun () () () -> handle_monitor_saved_pages hash_streamer)

let register_put_dac_member_signature ctx dac_plugin cctxt =
  add_service
    Tezos_rpc.Directory.register0
    (RPC_services.put_dac_member_signature dac_plugin)
    (fun () dac_member_signature ->
      Signature_manager.Coordinator.handle_put_dac_member_signature
        ctx
        cctxt
        dac_member_signature)

let register_get_certificate ctx dac_plugin =
  add_service
    Tezos_rpc.Directory.register1
    (RPC_services.get_certificate dac_plugin)
    (fun root_hash () () -> handle_get_certificate ctx root_hash)

let register_get_missing_page ctx dac_plugin =
  match Node_context.mode ctx with
  | Legacy _ | Observer _ ->
      add_service
        Tezos_rpc.Directory.register1
        (RPC_services.get_missing_page dac_plugin)
        (fun root_hash () () ->
          let open Lwt_result_syntax in
          let page_store = Node_context.get_page_store ctx in
          let*? cctxt = Node_context.get_coordinator_client ctx in
          handle_get_missing_page cctxt page_store dac_plugin root_hash)
  | Coordinator _ | Committee_member _ -> Fun.id

module Coordinator = struct
  let handle_post_preimage dac_plugin page_store hash_streamer payload =
    let open Lwt_result_syntax in
    let* root_hash =
      Pages_encoding.Merkle_tree.V0.Streaming.serialize_payload
        dac_plugin
        ~page_store
        payload
    in
    let () = Data_streamer.publish hash_streamer root_hash in
    let*! () =
      Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash
    in
    return root_hash

  let register_coordinator_post_preimage dac_plugin hash_streamer page_store =
    add_service
      Tezos_rpc.Directory.register0
      (RPC_services.Coordinator.post_preimage dac_plugin)
      (fun () payload ->
        handle_post_preimage dac_plugin page_store hash_streamer payload)

  let dynamic_rpc_dir dac_plugin (node_ctxt : Node_context.t) =
    let modal_node_ctxt =
      match Node_context.mode node_ctxt with
      | Coordinator modal_node_ctxt -> modal_node_ctxt
      (* We only pass [Node_context.t] values whose mode is [Coordinator _] to
         this function. *)
      | _ -> assert false
    in

    let hash_streamer =
      modal_node_ctxt.Node_context.Coordinator.hash_streamer
    in
    let page_streamer =
      modal_node_ctxt.Node_context.Coordinator.page_streamer
    in
    let public_keys_opt =
      Node_context.Coordinator.public_keys_opt modal_node_ctxt
    in
    let page_store = Node_context.get_page_store node_ctxt in
    let cctxt = Node_context.get_tezos_node_cctxt node_ctxt in

    Tezos_rpc.Directory.empty
    |> register_coordinator_post_preimage
         dac_plugin
         hash_streamer
         modal_node_ctxt.streaming_page_store
    |> register_get_verify_signature dac_plugin public_keys_opt
    |> register_get_preimage dac_plugin page_store
    |> register_monitor_root_hashes dac_plugin hash_streamer
    |> register_put_dac_member_signature node_ctxt dac_plugin cctxt
    |> register_get_certificate node_ctxt dac_plugin
    |> register_monitor_saved_pages dac_plugin page_streamer
end

module Committee_member = struct
  let dynamic_rpc_dir dac_plugin (node_ctxt : Node_context.t) =
    let page_store = Node_context.get_page_store node_ctxt in
    Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store
end

module Observer = struct
  let dynamic_rpc_dir dac_plugin (node_ctxt : Node_context.t) =
    let page_store = Node_context.get_page_store node_ctxt in
    Tezos_rpc.Directory.empty
    |> register_get_preimage dac_plugin page_store
    |> register_get_missing_page node_ctxt dac_plugin
end

module Legacy = struct
  let dynamic_rpc_dir dac_plugin (node_ctxt : Node_context.t) =
    let modal_node_ctxt =
      match Node_context.mode node_ctxt with
      | Legacy modal_node_ctxt -> modal_node_ctxt
      (* We only pass [Node_context.t] values whose mode is [Coordinator _] to
         this function. *)
      | _ -> assert false
    in
    let hash_streamer = modal_node_ctxt.Node_context.Legacy.hash_streamer in
    let page_streamer = modal_node_ctxt.Node_context.Legacy.page_streamer in
    let public_keys_opt = Node_context.Legacy.public_keys_opt modal_node_ctxt in
    let secret_key_uris_opt =
      Node_context.Legacy.secret_key_uris_opt modal_node_ctxt
    in
    let page_store = Node_context.get_page_store node_ctxt in
    let cctxt = Node_context.get_tezos_node_cctxt node_ctxt in
    Tezos_rpc.Directory.empty
    |> register_post_store_preimage
         dac_plugin
         cctxt
         secret_key_uris_opt
         modal_node_ctxt.streaming_page_store
         hash_streamer
    |> register_get_verify_signature dac_plugin public_keys_opt
    |> register_get_preimage dac_plugin page_store
    |> register_monitor_root_hashes dac_plugin hash_streamer
    |> register_put_dac_member_signature node_ctxt dac_plugin cctxt
    |> register_get_certificate node_ctxt dac_plugin
    |> register_get_missing_page node_ctxt dac_plugin
    |> register_monitor_saved_pages dac_plugin page_streamer
end

let start ~rpc_address ~rpc_port node_ctxt =
  let open Lwt_syntax in
  let register_dynamic_rpc dac_plugin =
    match Node_context.mode node_ctxt with
    | Coordinator _ -> Coordinator.dynamic_rpc_dir dac_plugin node_ctxt
    | Committee_member _ ->
        Committee_member.dynamic_rpc_dir dac_plugin node_ctxt
    | Observer _ -> Observer.dynamic_rpc_dir dac_plugin node_ctxt
    | Legacy _ -> Legacy.dynamic_rpc_dir dac_plugin node_ctxt
  in
  let dir =
    Tezos_rpc.Directory.register_dynamic_directory
      Tezos_rpc.Directory.empty
      Tezos_rpc.Path.open_root
      (fun () ->
        match Node_context.get_status node_ctxt with
        | Ready {dac_plugin = (module Dac_plugin)} ->
            Lwt.return (register_dynamic_rpc (module Dac_plugin))
        | Starting -> Lwt.return Tezos_rpc.Directory.empty)
  in
  let rpc_address = P2p_addr.of_string_exn rpc_address in
  let host = Ipaddr.V6.to_string rpc_address in
  let node = `TCP (`Port rpc_port) in
  let acl = RPC_server.Acl.default rpc_address in
  let server =
    RPC_server.init_server dir ~acl ~media_types:Media_type.all_media_types
  in
  Lwt.catch
    (fun () ->
      let* () =
        RPC_server.launch
          ~host
          server
          ~callback:(RPC_server.resto_callback server)
          node
      in
      return_ok server)
    fail_with_exn

let shutdown = RPC_server.shutdown

let install_finalizer rpc_server =
  let open Lwt_syntax in
  Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun exit_status ->
  let* () = shutdown rpc_server in
  let* () = Event.(emit shutdown_node exit_status) in
  Tezos_base_unix.Internal_event_unix.close ()
back to top