Revision e9e27cce8a171ce033cd42abad598c27b07df555 authored by Valentin Chaboche on 13 September 2023, 10:43:35 UTC, committed by Marge Bot on 22 September 2023, 07:41:44 UTC
1 parent 262d346
Raw File
layer1.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com>               *)
(* Copyright (c) 2023 Functori, <contact@functori.com>                       *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(**

    Errors
    ======

*)

type error += Cannot_find_block of Block_hash.t

let () =
  register_error_kind
    ~id:"sc_rollup.node.cannot_find_block"
    ~title:"Cannot find block from L1"
    ~description:"A block couldn't be found from the L1 node"
    ~pp:(fun ppf hash ->
      Format.fprintf
        ppf
        "Block with hash %a was not found on the L1 node."
        Block_hash.pp
        hash)
    `Temporary
    Data_encoding.(obj1 (req "hash" Block_hash.encoding))
    (function Cannot_find_block hash -> Some hash | _ -> None)
    (fun hash -> Cannot_find_block hash)

(**

   State
   =====

*)

type header = {
  hash : Block_hash.t;
  level : int32;
  header : Block_header.shell_header;
}

let header_encoding =
  let open Data_encoding in
  conv
    (fun {hash; level = _; header} -> (hash, header))
    (fun (hash, header) -> {hash; level = header.level; header})
    (merge_objs
       (obj1 (req "hash" Block_hash.encoding))
       Block_header.shell_header_encoding)

type head = {hash : Block_hash.t; level : int32}

let head_encoding =
  let open Data_encoding in
  conv
    (fun {hash; level} -> (hash, level))
    (fun (hash, level) -> {hash; level})
    (obj2 (req "hash" Block_hash.encoding) (req "level" Data_encoding.int32))

let head_of_header {hash; level; header = _} = {hash; level}

module Blocks_cache =
  Aches_lwt.Lache.Make_result
    (Aches.Rache.Transfer (Aches.Rache.LRU) (Block_hash))

type block = ..

type fetch_block_rpc =
  Client_context.full ->
  ?metadata:[`Always | `Never] ->
  ?chain:Tezos_shell_services.Block_services.chain ->
  ?block:Tezos_shell_services.Block_services.block ->
  unit ->
  block tzresult Lwt.t

type headers_cache = (Block_header.shell_header, tztrace) Blocks_cache.t

type blocks_cache = (block, tztrace) Blocks_cache.t

open Octez_crawler.Layer_1

type nonrec t = {
  l1 : t;
  cctxt : Client_context.full;
  blocks_cache : blocks_cache;
      (** Global blocks cache for the smart rollup node. *)
  headers_cache : headers_cache;
      (** Global block headers cache for the smart rollup node. *)
  prefetch_blocks : int;  (** Number of blocks to prefetch by default. *)
}

let start ~name ~reconnection_delay ~l1_blocks_cache_size ?protocols
    ?(prefetch_blocks = l1_blocks_cache_size) cctxt =
  let open Lwt_result_syntax in
  let*? () =
    if prefetch_blocks > l1_blocks_cache_size then
      error_with
        "Blocks to prefetch must be less than the cache size: %d"
        l1_blocks_cache_size
    else Ok ()
  in
  let*! l1 = start ~name ~reconnection_delay ?protocols cctxt in
  let blocks_cache = Blocks_cache.create l1_blocks_cache_size in
  let headers_cache = Blocks_cache.create l1_blocks_cache_size in
  let cctxt = (cctxt :> Client_context.full) in
  return {l1; cctxt; blocks_cache; headers_cache; prefetch_blocks}

let shutdown {l1; _} = shutdown l1

let cache_shell_header {headers_cache; _} hash header =
  Blocks_cache.put headers_cache hash (Lwt.return_ok header)

let client_context {cctxt; _} = cctxt

let iter_heads l1_ctxt f =
  iter_heads l1_ctxt.l1 @@ fun (hash, {shell = {level; _} as header; _}) ->
  cache_shell_header l1_ctxt hash header ;
  f {hash; level; header}

let wait_first l1_ctxt =
  let open Lwt_syntax in
  let+ hash, {shell = {level; _} as header; _} = wait_first l1_ctxt.l1 in
  {hash; level; header}

let get_predecessor_opt ?max_read {l1; _} = get_predecessor_opt ?max_read l1

let get_predecessor ?max_read {l1; _} = get_predecessor ?max_read l1

let get_tezos_reorg_for_new_head {l1; _} ?get_old_predecessor old_head new_head
    =
  get_tezos_reorg_for_new_head l1 ?get_old_predecessor old_head new_head

module Internal_for_tests = struct
  let dummy cctxt =
    {
      l1 = Internal_for_tests.dummy cctxt;
      cctxt = (cctxt :> Client_context.full);
      blocks_cache = Blocks_cache.create 1;
      headers_cache = Blocks_cache.create 1;
      prefetch_blocks = 0;
    }
end

(**

   Helpers
   =======

*)

(** [fetch_tezos_block cctxt hash] returns a block shell header of
    [hash]. Looks for the block in the blocks cache first, and fetches it from
    the L1 node otherwise. *)
let fetch_tezos_shell_header {cctxt; headers_cache; _} hash =
  trace (Cannot_find_block hash)
  @@
  let fetch hash =
    Tezos_shell_services.Shell_services.Blocks.Header.shell_header
      cctxt
      ~chain:`Main
      ~block:(`Hash (hash, 0))
      ()
  in
  Blocks_cache.bind_or_put headers_cache hash fetch Lwt.return

let fetch_block_no_cache (fetch : fetch_block_rpc) extract_header
    ({cctxt; blocks_cache; _} as l1_ctxt) hash =
  let open Lwt_result_syntax in
  trace (Cannot_find_block hash)
  @@ let* block =
       fetch cctxt ~chain:`Main ~block:(`Hash (hash, 0)) ~metadata:`Always ()
     in
     Blocks_cache.put blocks_cache hash (Lwt.return_ok block) ;
     cache_shell_header l1_ctxt hash (extract_header block) ;
     return block

(** [fetch_tezos_block cctxt fetch extract_header hash] returns a block info
    given a block hash. Looks for the block in the blocks cache first, and
    fetches it from the L1 node otherwise. *)
let fetch_tezos_block (fetch_rpc : fetch_block_rpc) extract_header
    ({cctxt; blocks_cache; _} as l1_ctxt) hash =
  trace (Cannot_find_block hash)
  @@
  let open Lwt_result_syntax in
  let fetch hash =
    let* block =
      fetch_rpc cctxt ~chain:`Main ~block:(`Hash (hash, 0)) ~metadata:`Always ()
    in
    cache_shell_header l1_ctxt hash (extract_header block) ;
    return block
  in
  let*! block = Blocks_cache.bind_or_put blocks_cache hash fetch Lwt.return in
  (* TODO: https://gitlab.com/tezos/tezos/-/issues/6292
     Consider cleaner ways to "prefetch" tezos blocks:
     - know before where are protocol boundaries
     - prefetch blocks in binary form *)
  let is_of_expected_protocol =
    match block with
    | Error
        (Tezos_rpc_http.RPC_client_errors.(
           Request_failed {error = Unexpected_content _; _})
        :: _) ->
        (* The promise cached failed to parse the block because it was for the
           wrong protocol. *)
        false
    | Error _ ->
        (* The promise cached failed for another reason. *)
        true
    | Ok block -> (
        (* We check if we are able to extract the header, which inherently
           ensures that we are in the correct case of type {!type:block}. *)
        try
          let (_ : Block_header.shell_header) = extract_header block in
          true
        with _ ->
          (* This can happen if the blocks for two protocols have the same
             binary representation. *)
          false)
  in
  if is_of_expected_protocol then Lwt.return block
  else
    (* It is possible for a value stored in the cache to have been parsed
       with the wrong protocol code because:
       1. There is no protocol information in binary blocks
       2. We pre-fetch blocks eagerly.

       If we realize a posteriori that the block we cached is for another
       protocol then we overwrite it with the correctly parsed one.
    *)
    fetch_block_no_cache fetch_rpc extract_header l1_ctxt hash

let make_prefetching_schedule {prefetch_blocks; _} blocks =
  let blocks_with_prefetching, _, first_prefetch =
    List.fold_left
      (fun (acc, nb_prefetch, prefetch) b ->
        let nb_prefetch = nb_prefetch + 1 in
        let prefetch = b :: prefetch in
        if nb_prefetch >= prefetch_blocks then ((b, prefetch) :: acc, 0, [])
        else ((b, []) :: acc, nb_prefetch, prefetch))
      ([], 0, [])
      (List.rev blocks)
  in
  match (blocks_with_prefetching, first_prefetch) with
  | [], _ | _, [] -> blocks_with_prefetching
  | (first, _) :: rest, _ -> (first, first_prefetch) :: rest

let prefetch_tezos_blocks fetch extract_header l1_ctxt = function
  | [] -> ()
  | blocks ->
      Lwt.async @@ fun () ->
      List.iter_p
        (fun {hash; _} ->
          let open Lwt_syntax in
          let+ _maybe_block =
            fetch_tezos_block fetch extract_header l1_ctxt hash
          in
          ())
        blocks
back to top