Raw File
layer_1.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) Nomadic Labs, <contact@nomadic-labs.com>                    *)
(* Copyright (c) Functori, <contact@functori.com>                            *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(**

    Errors
    ======

*)

type error += Cannot_find_predecessor of Block_hash.t

let () =
  register_error_kind
    ~id:"lib_crawler.cannot_find_predecessor"
    ~title:"Cannot find block predecessor from L1"
    ~description:"A predecessor couldn't be found from the L1 node"
    ~pp:(fun ppf hash ->
      Format.fprintf
        ppf
        "Block with hash %a has no predecessor on the L1 node."
        Block_hash.pp
        hash)
    `Temporary
    Data_encoding.(obj1 (req "hash" Block_hash.encoding))
    (function Cannot_find_predecessor hash -> Some hash | _ -> None)
    (fun hash -> Cannot_find_predecessor hash)

(**

   State
   =====

*)

type t = {
  name : string;
  protocols : Protocol_hash.t list option;
  reconnection_delay : float;
  heads : (Block_hash.t * Block_header.t) Lwt_stream.t;
  cctxt : Client_context.full;
  stopper : Tezos_rpc.Context.stopper;
  mutable running : bool;
}

let rec connect ~name ?(count = 0) ~delay ~protocols cctxt =
  let open Lwt_syntax in
  let* () =
    if count = 0 then return_unit
    else
      let fcount = float_of_int (count - 1) in
      (* Randomized exponential backoff capped to 1.5h: 1.5^count * delay ± 50% *)
      let delay = delay *. (1.5 ** fcount) in
      let delay = min delay 3600. in
      let randomization_factor = 0.5 (* 50% *) in
      let delay =
        delay
        +. Random.float (delay *. 2. *. randomization_factor)
        -. (delay *. randomization_factor)
      in
      let* () = Layer1_event.wait_reconnect ~name delay in
      Lwt_unix.sleep delay
  in
  let* res =
    Tezos_shell_services.Monitor_services.heads ?protocols cctxt cctxt#chain
  in
  match res with
  | Ok (heads, stopper) ->
      let heads =
        Lwt_stream.map_s
          (fun ( hash,
                 (Tezos_base.Block_header.{shell = {level; _}; _} as header) ) ->
            let+ () = Layer1_event.switched_new_head ~name hash level in
            (hash, header))
          heads
      in
      return (heads, stopper)
  | Error e ->
      let* () = Layer1_event.cannot_connect ~name ~count e in
      connect ~name ~delay ~protocols ~count:(count + 1) cctxt

let start ~name ~reconnection_delay ?protocols (cctxt : #Client_context.full) =
  let open Lwt_syntax in
  let* () = Layer1_event.starting ~name in
  let+ heads, stopper =
    connect ~name ~delay:reconnection_delay ~protocols cctxt
  in
  {
    name;
    cctxt = (cctxt :> Client_context.full);
    heads;
    stopper;
    reconnection_delay;
    protocols;
    running = true;
  }

let reconnect l1_ctxt =
  let open Lwt_syntax in
  let* heads, stopper =
    connect
      ~name:l1_ctxt.name
      ~count:1
      ~delay:l1_ctxt.reconnection_delay
      ~protocols:l1_ctxt.protocols
      l1_ctxt.cctxt
  in
  return {l1_ctxt with heads; stopper}

let shutdown state =
  state.stopper () ;
  state.running <- false ;
  Lwt.return_unit

let regexp_ocaml_exception_connection_error = Re.Str.regexp ".*in connect:.*"

let is_connection_error trace =
  TzTrace.fold
    (fun yes error ->
      yes
      ||
      match error with
      | RPC_client_errors.(Request_failed {error = Connection_failed _; _}) ->
          true
      | RPC_client_errors.(Request_failed {error = OCaml_exception s; _}) ->
          (* This error can surface if the external RPC servers of the L1 node are
             shutdown but the request is still in the RPC worker. *)
          Re.Str.string_match regexp_ocaml_exception_connection_error s 0
      | _ -> false)
    false
    trace

(* TODO: https://gitlab.com/tezos/tezos/-/issues/2895
   Use Lwt_stream.iter_es once it is exposed. *)
let iter_heads l1_ctxt f =
  let exception Iter_error of tztrace in
  let rec loop l1_ctxt =
    let open Lwt_result_syntax in
    let*! () =
      Lwt_stream.iter_s
        (fun head ->
          let open Lwt_syntax in
          let* res = f head in
          match res with
          | Ok () -> return_unit
          | Error trace when is_connection_error trace ->
              Format.eprintf
                "@[<v 2>Connection error:@ %a@]@."
                pp_print_trace
                trace ;
              l1_ctxt.stopper () ;
              return_unit
          | Error e -> raise (Iter_error e))
        l1_ctxt.heads
    in
    when_ l1_ctxt.running @@ fun () ->
    let*! () = Layer1_event.connection_lost ~name:l1_ctxt.name in
    let*! l1_ctxt = reconnect l1_ctxt in
    loop l1_ctxt
  in
  Lwt.catch
    (fun () -> Lwt.no_cancel @@ loop l1_ctxt)
    (function Iter_error e -> Lwt.return_error e | exn -> fail_with_exn exn)

let wait_first l1_ctxt =
  let rec loop l1_ctxt =
    let open Lwt_syntax in
    let* head = Lwt_stream.peek l1_ctxt.heads in
    match head with
    | Some head -> return head
    | None ->
        let* l1_ctxt = reconnect l1_ctxt in
        loop l1_ctxt
  in
  Lwt.no_cancel @@ loop l1_ctxt

(** [predecessors_of_blocks hashes] given a list of successive block hashes,
    from newest to oldest, returns an associative list that associates a hash to
    its predecessor in this list. *)
let predecessors_of_blocks hashes =
  let rec aux next = function [] -> [] | x :: xs -> (next, x) :: aux x xs in
  match hashes with [] -> [] | x :: xs -> aux x xs

(** [get_predecessor block_hash] returns the predecessor block hash of
    some [block_hash] through an RPC to the Tezos node. To limit the
    number of RPCs, this information is requested for a batch of hashes
    and cached locally. *)
let get_predecessor =
  let max_cached = 65536 in
  let hard_max_read = max_cached in
  (* 2MB *)
  let module HM =
    Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) (Block_hash)
  in
  let cache = HM.create max_cached in
  fun ~max_read
      cctxt
      (chain : Tezos_shell_services.Chain_services.chain)
      ancestor ->
    let open Lwt_result_syntax in
    (* Don't read more than the hard limit in one RPC call. *)
    let max_read = min max_read hard_max_read in
    (* But read at least two because the RPC also returns the head. *)
    let max_read = max max_read 2 in
    match HM.find_opt cache ancestor with
    | Some pred -> return_some pred
    | None -> (
        let* blocks =
          Tezos_shell_services.Chain_services.Blocks.list
            cctxt
            ~chain
            ~heads:[ancestor]
            ~length:max_read
            ()
        in
        match blocks with
        | [ancestors] -> (
            List.iter
              (fun (h, p) -> HM.replace cache h p)
              (predecessors_of_blocks ancestors) ;
            match HM.find_opt cache ancestor with
            | None ->
                (* This could happen if ancestors was empty, but it shouldn't be. *)
                return_none
            | Some predecessor -> return_some predecessor)
        | _ -> return_none)

let get_predecessor_opt ?(max_read = 8) state (hash, level) =
  let open Lwt_result_syntax in
  if level = 0l then return_none
  else
    let level = Int32.pred level in
    let+ hash = get_predecessor ~max_read state.cctxt state.cctxt#chain hash in
    Option.map (fun hash -> (hash, level)) hash

let get_predecessor ?max_read state ((hash, _) as head) =
  let open Lwt_result_syntax in
  let* pred = get_predecessor_opt ?max_read state head in
  match pred with
  | None -> tzfail (Cannot_find_predecessor hash)
  | Some pred -> return pred

let nth_predecessor ~get_predecessor n block =
  let open Lwt_result_syntax in
  assert (n >= 0) ;
  let rec aux acc n block =
    if n = 0 then return (block, acc)
    else
      let* pred = get_predecessor block in
      (aux [@tailcall]) (block :: acc) (n - 1) pred
  in
  aux [] n block

let get_tezos_reorg_for_new_head l1_state
    ?(get_old_predecessor = get_predecessor l1_state) old_head new_head =
  let open Lwt_result_syntax in
  (* old_head and new_head must have the same level when calling aux *)
  let rec aux reorg old_head new_head =
    let old_head_hash, _ = old_head in
    let new_head_hash, _ = new_head in
    if Block_hash.(old_head_hash = new_head_hash) then return reorg
    else
      let* old_head_pred = get_old_predecessor old_head in
      let* new_head_pred = get_predecessor l1_state new_head in
      let reorg =
        Reorg.
          {
            old_chain = old_head :: reorg.old_chain;
            new_chain = new_head :: reorg.new_chain;
          }
      in
      (aux [@tailcall]) reorg old_head_pred new_head_pred
  in
  (* computing partial reorganization to make old_head and new_head at same
     level *)
  let _, old_head_level = old_head in
  let _, new_head_level = new_head in
  let distance = Int32.(to_int @@ abs @@ sub new_head_level old_head_level) in
  let* old_head, new_head, reorg =
    if old_head_level = new_head_level then
      return (old_head, new_head, Reorg.no_reorg)
    else if old_head_level < new_head_level then
      let max_read = distance + 1 (* reading includes the head *) in
      let+ new_head, new_chain =
        nth_predecessor
          ~get_predecessor:(get_predecessor ~max_read l1_state)
          distance
          new_head
      in
      (old_head, new_head, {Reorg.no_reorg with new_chain})
    else
      let+ old_head, old_chain =
        nth_predecessor ~get_predecessor:get_old_predecessor distance old_head
      in
      (old_head, new_head, {Reorg.no_reorg with old_chain})
  in
  assert (snd old_head = snd new_head) ;
  aux reorg old_head new_head

(** Returns the reorganization of L1 blocks (if any) for [new_head]. *)
let get_tezos_reorg_for_new_head l1_state ?get_old_predecessor old_head new_head
    =
  let open Lwt_result_syntax in
  match old_head with
  | `Level l ->
      let _, new_head_level = new_head in
      (* No known tezos head, we want all blocks from l. *)
      if new_head_level < l then return Reorg.no_reorg
      else
        let distance = Int32.sub new_head_level l |> Int32.to_int in
        let max_read = distance + 1 (* reading includes the head *) in
        let* _block_at_l, new_chain =
          nth_predecessor
            ~get_predecessor:(get_predecessor ~max_read l1_state)
            distance
            new_head
        in
        return Reorg.{old_chain = []; new_chain}
  | `Head old_head ->
      get_tezos_reorg_for_new_head
        l1_state
        ?get_old_predecessor
        old_head
        new_head

module Internal_for_tests = struct
  let dummy cctxt =
    let heads, _push = Lwt_stream.create () in
    {
      name = "dummy_layer_1_for_tests";
      reconnection_delay = 5.0;
      heads;
      cctxt = (cctxt :> Client_context.full);
      stopper = Fun.id;
      protocols = None;
      running = false;
    }
end
back to top