https://gitlab.com/tezos/tezos
Raw File
Tip revision: ce7d752cc8d1fb72a99a96158c2a1ba4e3a8ed13 authored by Thomas Letan on 29 May 2023, 14:13:29 UTC
Teztible: Write experiments and scenarios in Yaml scripts
Tip revision: ce7d752
store.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com>               *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(* FIXME: https://gitlab.com/tezos/tezos/-/issues/3207
   use another storage solution that irmin as we don't need backtracking *)

(* FIXME: https://gitlab.com/tezos/tezos/-/issues/4097
   Add an interface to this module *)

(* Relative path to store directory from base-dir *)
let path = "store"

module StoreMaker = Irmin_pack_unix.KV (Tezos_context_encoding.Context.Conf)
include StoreMaker.Make (Irmin.Contents.String)

let shard_store_dir = "shard_store"

let info message =
  let date = Unix.gettimeofday () |> int_of_float |> Int64.of_int in
  Info.v ~author:"DAL Node" ~message date

let set ~msg store path v = set_exn store path v ~info:(fun () -> info msg)

let remove ~msg store path = remove_exn store path ~info:(fun () -> info msg)

module Shards = struct
  include Key_value_store

  type nonrec t = (Cryptobox.Commitment.t * int, Cryptobox.share) t

  (* TODO: https://gitlab.com/tezos/tezos/-/issues/4973
     Make storage more resilient to DAL parameters change. *)
  let are_shards_available store commitment shard_indexes =
    let open Lwt_result_syntax in
    List.for_all_es
      (fun index ->
        let*! value = read_value store (commitment, index) in
        match value with
        | Ok _ -> return true
        | Error [Stored_data.Missing_stored_data _] -> return false
        | Error e -> fail e)
      shard_indexes

  let save_and_notify shards_store shards_watcher commitment shards =
    let open Lwt_result_syntax in
    let shards =
      Seq.map
        (fun {Cryptobox.index; share} -> ((commitment, index), share))
        shards
    in
    let* () = write_values shards_store shards |> Errors.other_lwt_result in
    let*! () =
      Event.(emit stored_slot_shards (commitment, Seq.length shards))
    in
    (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4974

       DAL/Node: rehaul the store  abstraction & notification system.
    *)
    return @@ Lwt_watcher.notify shards_watcher commitment

  let read_all shards_store commitment ~number_of_shards =
    read_values shards_store
    @@ List.fold_left
         (fun seq shard_index -> Seq.cons (commitment, shard_index) seq)
         Seq.empty
         (0 -- (number_of_shards - 1))

  let init node_store_dir shard_store_dir =
    let ( // ) = Filename.concat in
    let dir_path = node_store_dir // shard_store_dir in
    init ~lru_size:Constants.shards_store_lru_size (fun (commitment, index) ->
        let commitment_string = Cryptobox.Commitment.to_b58check commitment in
        let filename = string_of_int index in
        let filepath = dir_path // commitment_string // filename in
        Stored_data.make_file ~filepath Cryptobox.share_encoding Stdlib.( = ))
end

module Shard_proofs_cache =
  Aches.Vache.Map (Aches.Vache.LRU_Precise) (Aches.Vache.Strong)
    (struct
      type t = Cryptobox.Commitment.t

      let equal = Cryptobox.Commitment.equal

      let hash = Hashtbl.hash
    end)

(** Store context *)
type node_store = {
  store : t;
  shard_store : Shards.t;
  shards_watcher : Cryptobox.Commitment.t Lwt_watcher.input;
  gs_worker : Gossipsub.Worker.t;
  in_memory_shard_proofs : Cryptobox.shard_proof array Shard_proofs_cache.t;
}

(** [open_shards_stream node_store] opens a stream that should be notified when
    the storage is updated with new shards. *)
let open_shards_stream {shards_watcher; _} =
  Lwt_watcher.create_stream shards_watcher

(** [init gs_worker config] inits the store on the filesystem using the
    given [config] and [gs_worker]. *)
let init gs_worker config =
  let open Lwt_result_syntax in
  let base_dir = Configuration.data_dir_path config path in
  let shards_watcher = Lwt_watcher.create_input () in
  let*! repo = Repo.v (Irmin_pack.config base_dir) in
  let*! store = main repo in
  let shard_store = Shards.init base_dir shard_store_dir in
  let*! () = Event.(emit store_is_ready ()) in

  (* The size of the cache of 1024 entries is chosen such that: if a DAL node
     stores the shard proofs of 128 slots per level, the cache will be able to
     store the proofs for 8 levels, which should be quite sufficient
     with the current attestation lag.

     A shard proof takes 52 bytes with the current encoding (could be improved
     to 48), so the maximum memory footprint of the cache is dominated by (keys
     size is negligible):

     1024 (cache size) * 2048 (shards per slot) * 52 bytes = 109 mb *)
  let cache_size = 1024 in
  return
    {
      shard_store;
      store;
      shards_watcher;
      gs_worker;
      in_memory_shard_proofs = Shard_proofs_cache.create cache_size;
    }

let trace_decoding_error ~data_kind ~tztrace_of_error r =
  let open Result_syntax in
  match r with
  | Ok r -> return r
  | Error err ->
      let tztrace = tztrace_of_error err in
      fail @@ `Decoding_failed (data_kind, tztrace)

let tztrace_of_read_error read_err =
  [Exn (Data_encoding.Binary.Read_error read_err)]

let encode_commitment = Cryptobox.Commitment.to_b58check

let decode_commitment v =
  trace_decoding_error
    ~data_kind:Types.Commitment
    ~tztrace_of_error:(fun tztrace -> tztrace)
  @@ Cryptobox.Commitment.of_b58check v

let encode_header_status =
  Data_encoding.Binary.to_string_exn Services.Types.header_status_encoding

let decode_header_status v =
  trace_decoding_error
    ~data_kind:Types.Header_status
    ~tztrace_of_error:tztrace_of_read_error
  @@ Data_encoding.Binary.of_string Services.Types.header_status_encoding v

let decode_slot_id v =
  trace_decoding_error
    ~data_kind:Types.Slot_id
    ~tztrace_of_error:tztrace_of_read_error
  @@ Data_encoding.Binary.of_string Services.Types.slot_id_encoding v

let encode_slot slot_size =
  Data_encoding.Binary.to_string_exn (Data_encoding.Fixed.bytes slot_size)

let decode_slot slot_size v =
  trace_decoding_error
    ~data_kind:Types.Slot
    ~tztrace_of_error:tztrace_of_read_error
  @@ Data_encoding.Binary.of_string (Data_encoding.Fixed.bytes slot_size) v

let encode_profile profile =
  Data_encoding.Binary.to_string_exn Services.Types.profile_encoding profile

let decode_profile profile =
  trace_decoding_error
    ~data_kind:Types.Profile
    ~tztrace_of_error:tztrace_of_read_error
  @@ Data_encoding.Binary.of_string Services.Types.profile_encoding profile

(* FIXME: https://gitlab.com/tezos/tezos/-/issues/4975

   DAL/Node: Replace Irmin storage for paths
*)
module Legacy = struct
  module Path : sig
    type t = string list

    val to_string : ?prefix:string -> t -> string

    module Commitment : sig
      val slot : Cryptobox.commitment -> slot_size:int -> Path.t

      val headers : Cryptobox.commitment -> Path.t

      val header : Cryptobox.commitment -> Services.Types.slot_id -> Path.t

      val shards : Cryptobox.commitment -> Path.t

      type shard_index := int

      val shard :
        Cryptobox.commitment ->
        redundancy_factor:int ->
        number_of_shards:int ->
        shard_index ->
        Path.t
    end

    module Level : sig
      (**
         Part of the storage for slots' headers where paths are indexed by slots
         indices.

         "Accepted" path(s) are used to store information about slots headers
         that are either [`Waiting_attesattion], [`Attested], or [`Unattested].

         "Others" path(s) are used to store information of slots headers when
         their statuses are [`Not_selected] or [`Unseen]. *)

      val slots_indices : Services.Types.level -> Path.t

      val accepted_header_commitment : Services.Types.slot_id -> Path.t

      val accepted_header_status : Services.Types.slot_id -> Path.t

      val others : Services.Types.slot_id -> Path.t

      val other_header_status :
        Services.Types.slot_id -> Cryptobox.commitment -> Path.t
    end

    module Profile : sig
      val profiles : Path.t

      val profile : Services.Types.profile -> Path.t
    end
  end = struct
    type t = string list

    (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4457
       Avoid the wasteful [List.append]s. *)
    let ( / ) path suffix = path @ [suffix]

    let to_string ?prefix p =
      let s = String.concat "/" p in
      Option.fold ~none:s ~some:(fun pr -> pr ^ s) prefix

    module Commitment = struct
      let root = ["commitments"]

      let slot commitment ~slot_size =
        let commitment_repr = Cryptobox.Commitment.to_b58check commitment in
        root / commitment_repr / Int.to_string slot_size / "slot"

      let headers commitment =
        let commitment_repr = Cryptobox.Commitment.to_b58check commitment in
        root / commitment_repr / "headers"

      let header commitment index =
        let open Services.Types in
        let prefix = headers commitment in
        prefix / Data_encoding.Binary.to_string_exn slot_id_encoding index

      let shards commitment =
        let commitment_repr = Cryptobox.Commitment.to_b58check commitment in
        root / commitment_repr / "shards"

      let shard commitment ~redundancy_factor ~number_of_shards index =
        let prefix = shards commitment in
        let parameters_repr =
          Printf.sprintf "%d-%d" redundancy_factor number_of_shards
        in
        prefix / "parameters" / parameters_repr / "index" / Int.to_string index
    end

    module Level = struct
      let root = ["levels"]

      let slots_indices slot_level = root / Int32.to_string slot_level

      let headers index =
        let open Services.Types in
        slots_indices index.slot_level / Int.to_string index.slot_index

      let accepted_header index =
        let prefix = headers index in
        prefix / "accepted"

      let accepted_header_commitment index =
        let prefix = accepted_header index in
        prefix / "commitment"

      let accepted_header_status index =
        let prefix = accepted_header index in
        prefix / "status"

      let others index =
        let prefix = headers index in
        prefix / "others"

      let other_header_status index commitment =
        let commitment_repr = Cryptobox.Commitment.to_b58check commitment in
        others index / commitment_repr / "status"
    end

    module Profile = struct
      let root = ["profiles"]

      let profiles = root

      let profile profile = root / encode_profile profile
    end
  end

  let add_slot_by_commitment node_store cryptobox slot commitment =
    let open Lwt_syntax in
    let Cryptobox.{slot_size; _} = Cryptobox.parameters cryptobox in
    let path = Path.Commitment.slot commitment ~slot_size in
    let encoded_slot = encode_slot slot_size slot in
    let* () = set ~msg:"Slot stored" node_store.store path encoded_slot in
    let* () = Event.(emit stored_slot_content commitment) in
    return_unit

  let associate_slot_id_with_commitment node_store commitment slot_id =
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4528
       Improve the implementation of this handler.
    *)
    let open Lwt_syntax in
    let store = node_store.store in
    let header_path = Path.Commitment.header commitment slot_id in
    let levels_path = Path.Level.other_header_status slot_id commitment in
    let* known_levels = mem store levels_path in
    let* known_header = mem store header_path in
    (* An invariant that should hold for the storage. *)
    assert (known_levels = known_header) ;
    if known_levels then return_unit
    else
      (* The path allows to reconstruct the data. *)
      let* () =
        set
          ~msg:
            (Path.to_string
               ~prefix:"associate_slot_id_with_commitment:"
               header_path)
          store
          header_path
          ""
      in
      set
        ~msg:
          (Path.to_string
             ~prefix:"associate_slot_id_with_commitment:"
             levels_path)
        store
        levels_path
        (encode_header_status `Unseen)

  let exists_slot_by_commitment node_store cryptobox commitment =
    let Cryptobox.{slot_size; _} = Cryptobox.parameters cryptobox in
    let path = Path.Commitment.slot commitment ~slot_size in
    mem node_store.store path

  let find_slot_by_commitment node_store cryptobox commitment =
    let open Lwt_result_syntax in
    let Cryptobox.{slot_size; _} = Cryptobox.parameters cryptobox in
    let path = Path.Commitment.slot commitment ~slot_size in
    let*! res_opt = find node_store.store path in
    Option.fold
      ~none:(return None)
      ~some:(fun v ->
        let*? dec = decode_slot slot_size v in
        return @@ Some dec)
      res_opt

  let get_opt array i =
    if i >= 0 && i < Array.length array then Some array.(i) else None

  (** [shards_to_attestors committee] takes a committee [Committee_cache.committee]
      and returns a function that, given a shard index, yields the pkh of its
      attestor for that level. *)
  let shards_to_attestors committee =
    let rec do_n ~n f acc = if n <= 0 then acc else do_n ~n:(n - 1) f (f acc) in
    let to_array committee =
      (* We transform the map to a list *)
      Tezos_crypto.Signature.Public_key_hash.Map.bindings committee
      (* We sort the list in decreasing order w.r.t. to start_indices. *)
      |> List.fast_sort (fun (_pkh1, shard_indices1) (_pkh2, shard_indices2) ->
             shard_indices2.Committee_cache.start_index
             - shard_indices1.Committee_cache.start_index)
         (* We fold on the sorted list, starting from bigger start_indices. *)
      |> List.fold_left
           (fun accu (pkh, Committee_cache.{start_index = _; offset}) ->
             (* We put in the accu list as many [pkh] occurrences as the number
                of shards this pkh should attest, namely, [offset]. *)
             do_n ~n:offset (fun acc -> pkh :: acc) accu)
           []
      (* We build an array from the list. The array indices coincide with shard
         indices. *)
      |> Array.of_list
    in
    let committee = to_array committee in
    fun index -> get_opt committee index

  (* This function publishes the shards of a commitment that is waiting for
     attestion on L1 if this node has those shards on disk and their proofs in
     memory. *)
  let publish_slot_data ~level_committee node_store cryptobox proto_parameters
      commitment published_level slot_index =
    let open Lwt_result_syntax in
    match
      Shard_proofs_cache.find_opt node_store.in_memory_shard_proofs commitment
    with
    | None ->
        (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5676

            Recompute the proofs if we have shards on disk? Useful if the node
            restarted. *)
        return_unit
    | Some shard_proofs ->
        let attestation_level =
          Int32.(
            add
              published_level
              (of_int proto_parameters.Dal_plugin.attestation_lag))
        in
        let* committee = level_committee ~level:attestation_level in
        let attestor_of_shard = shards_to_attestors committee in
        let Cryptobox.{number_of_shards; _} = Cryptobox.parameters cryptobox in
        Shards.read_all node_store.shard_store commitment ~number_of_shards
        |> Seq_s.iter_ep (function
               | _, Error [Stored_data.Missing_stored_data s] ->
                   let*! () =
                     Event.(
                       emit
                         loading_shard_data_failed
                         ("Missing stored data " ^ s))
                   in
                   return_unit
               | _, Error err ->
                   let*! () =
                     Event.(
                       emit
                         loading_shard_data_failed
                         (Format.asprintf "%a" pp_print_trace err))
                   in
                   return_unit
               | (commitment, shard_index), Ok share -> (
                   match
                     ( attestor_of_shard shard_index,
                       get_opt shard_proofs shard_index )
                   with
                   | None, _ ->
                       failwith
                         "Invariant broken: no attestor found for shard %d"
                         shard_index
                   | _, None ->
                       failwith
                         "Invariant broken: no shard proof found for shard %d"
                         shard_index
                   | Some pkh, Some shard_proof ->
                       let message = Gossipsub.{share; shard_proof} in
                       let topic = Gossipsub.{slot_index; pkh} in
                       let message_id =
                         Gossipsub.
                           {
                             commitment;
                             level = published_level;
                             slot_index;
                             shard_index;
                             pkh;
                           }
                       in
                       Gossipsub.Worker.(
                         Publish_message {message; topic; message_id}
                         |> app_input node_store.gs_worker) ;
                       return_unit))

  let add_slot_headers ~level_committee cryptobox proto_parameters ~block_level
      ~block_hash:_ slot_headers node_store =
    let open Lwt_result_syntax in
    let slots_store = node_store.store in
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4388
       Handle reorgs. *)
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4389
             https://gitlab.com/tezos/tezos/-/issues/4528
       Handle statuses evolution. *)
    List.iter_es
      (fun (slot_header, status) ->
        let Dal_plugin.{slot_index; commitment; published_level} =
          slot_header
        in
        (* This invariant should hold. *)
        assert (Int32.equal published_level block_level) ;
        let index = Services.Types.{slot_level = published_level; slot_index} in
        let header_path = Path.Commitment.header commitment index in
        let*! () =
          set
            ~msg:(Path.to_string ~prefix:"add_slot_headers:" header_path)
            slots_store
            header_path
            ""
        in
        let others_path = Path.Level.other_header_status index commitment in
        match status with
        | Dal_plugin.Succeeded ->
            let commitment_path = Path.Level.accepted_header_commitment index in
            let status_path = Path.Level.accepted_header_status index in
            let data = encode_commitment commitment in
            (* Before adding the item in accepted path, we should remove it from
               others path, as it may appear there with an Unseen status. *)
            let*! () =
              remove
                ~msg:(Path.to_string ~prefix:"add_slot_headers:" others_path)
                slots_store
                others_path
            in
            let*! () =
              set
                ~msg:
                  (Path.to_string ~prefix:"add_slot_headers:" commitment_path)
                slots_store
                commitment_path
                data
            in
            let*! () =
              set
                ~msg:(Path.to_string ~prefix:"add_slot_headers:" status_path)
                slots_store
                status_path
                (encode_header_status `Waiting_attestation)
            in
            publish_slot_data
              ~level_committee
              node_store
              cryptobox
              proto_parameters
              commitment
              published_level
              slot_index
        | Dal_plugin.Failed ->
            let*! () =
              set
                ~msg:(Path.to_string ~prefix:"add_slot_headers:" others_path)
                slots_store
                others_path
                (encode_header_status `Not_selected)
            in
            return_unit)
      slot_headers

  let update_slot_headers_attestation ~published_level ~number_of_slots store
      attested =
    let open Lwt_syntax in
    let module S = Set.Make (Int) in
    let attested = List.fold_left (fun s e -> S.add e s) S.empty attested in
    let attested_str = encode_header_status `Attested in
    let unattested_str = encode_header_status `Unattested in
    List.iter_s
      (fun slot_index ->
        let index = Services.Types.{slot_level = published_level; slot_index} in
        let status_path = Path.Level.accepted_header_status index in
        let msg =
          Path.to_string ~prefix:"update_slot_headers_attestation:" status_path
        in
        if S.mem slot_index attested then
          set ~msg store status_path attested_str
        else
          let* old_data_opt = find store status_path in
          if Option.is_some old_data_opt then
            set ~msg store status_path unattested_str
          else
            (* There is no header that has been included in a block and selected
               for  this index. So, the slot cannot be attested or
               unattested. *)
            return_unit)
      (0 -- (number_of_slots - 1))

  let update_selected_slot_headers_statuses ~block_level ~attestation_lag
      ~number_of_slots attested node_store =
    let store = node_store.store in
    let published_level = Int32.(sub block_level (of_int attestation_lag)) in
    update_slot_headers_attestation
      ~published_level
      ~number_of_slots
      store
      attested

  let get_commitment_by_published_level_and_index ~level ~slot_index node_store
      =
    let open Lwt_result_syntax in
    let index = Services.Types.{slot_level = level; slot_index} in
    let*! commitment_str_opt =
      find node_store.store @@ Path.Level.accepted_header_commitment index
    in
    Option.fold
      ~none:(fail `Not_found)
      ~some:(fun c_str -> Lwt.return @@ decode_commitment c_str)
      commitment_str_opt

  let get_profiles node_store =
    let open Lwt_syntax in
    let path = Path.Profile.profiles in
    let* profiles = list node_store.store path in
    return @@ List.map_e (fun (p, _) -> decode_profile p) profiles

  let add_profile Dal_plugin.{number_of_slots; _} node_store profile =
    let open Lwt_syntax in
    let path = Path.Profile.profile profile in
    let* () =
      set
        ~msg:(Printf.sprintf "New profile added: %s" (Path.to_string path))
        node_store.store
        path
        ""
    in
    match profile with
    | Attestor pkh ->
        List.iter
          (fun slot_index ->
            Join Gossipsub.{slot_index; pkh}
            |> Gossipsub.Worker.(app_input node_store.gs_worker))
          Utils.Infix.(0 -- (number_of_slots - 1)) ;
        return_unit

  (** Filter the given list of indices according to the values of the given slot
      level and index. *)
  let filter_indexes =
    let keep_field v = function None -> true | Some f -> f = v in
    fun ?slot_level ?slot_index indexes ->
      let open Result_syntax in
      let* indexes =
        List.map_e (fun (slot_id, _) -> decode_slot_id slot_id) indexes
      in
      List.filter
        (fun {Services.Types.slot_level = l; slot_index = i} ->
          keep_field l slot_level && keep_field i slot_index)
        indexes
      |> return

  (* See doc-string in {!Legacy.Path.Level} for the notion of "accepted"
     header. *)
  let get_accepted_headers ~skip_commitment slot_ids store accu =
    let open Lwt_result_syntax in
    List.fold_left_es
      (fun acc slot_id ->
        let commitment_path = Path.Level.accepted_header_commitment slot_id in
        let*! commitment_opt = find store commitment_path in
        match commitment_opt with
        | None -> return acc
        | Some read_commitment -> (
            let*? decision = skip_commitment read_commitment in
            match decision with
            | `Skip -> return acc
            | `Keep commitment -> (
                let status_path = Path.Level.accepted_header_status slot_id in
                let*! status_opt = find store status_path in
                match status_opt with
                | None -> return acc
                | Some status_str ->
                    let*? status = decode_header_status status_str in
                    return
                    @@ {
                         Services.Types.slot_id;
                         commitment;
                         status = (status :> Services.Types.header_status);
                       }
                       :: acc)))
      accu
      slot_ids

  (* See doc-string in {!Legacy.Path.Level} for the notion of "accepted"
     header. *)
  let get_accepted_headers_of_commitment commitment slot_ids store accu =
    let encoded_commitment = encode_commitment commitment in
    let skip_commitment read_commitment =
      Result_syntax.return
        (if String.equal read_commitment encoded_commitment then
         `Keep commitment
        else `Skip)
    in
    get_accepted_headers ~skip_commitment slot_ids store accu

  (* See doc-string in {!Legacy.Path.Level} for the notion of "other(s)"
     header. *)
  let get_other_headers_of_identified_commitment commitment slot_id store acc =
    let open Lwt_result_syntax in
    let*! status_opt =
      find store @@ Path.Level.other_header_status slot_id commitment
    in
    match status_opt with
    | None -> return acc
    | Some status_str ->
        let*? status = decode_header_status status_str in
        return @@ ({Services.Types.slot_id; commitment; status} :: acc)

  (* See doc-string in {!Legacy.Path.Level} for the notion of "other(s)"
     header. *)
  let get_other_headers_of_commitment commitment slot_ids store accu =
    List.fold_left_es
      (fun acc slot_id ->
        get_other_headers_of_identified_commitment commitment slot_id store acc)
      accu
      slot_ids

  let get_commitment_headers commitment ?slot_level ?slot_index node_store =
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4528
       Improve the implementation of this handler.
    *)
    let open Lwt_result_syntax in
    let store = node_store.store in
    (* Get the list of known slot identifiers for [commitment]. *)
    let*! indexes = list store @@ Path.Commitment.headers commitment in
    (* Filter the list of indices by the values of [slot_level] [slot_index]. *)
    let*? slot_ids = filter_indexes ?slot_level ?slot_index indexes in
    let* accu = get_other_headers_of_commitment commitment slot_ids store [] in
    get_accepted_headers_of_commitment commitment slot_ids store accu

  (* See doc-string in {!Legacy.Path.Level} for the notion of "other(s)"
     header. *)
  let get_other_headers slot_ids store accu =
    let open Lwt_result_syntax in
    List.fold_left_es
      (fun acc slot_id ->
        let*! commitments_with_statuses =
          list store @@ Path.Level.others slot_id
        in
        List.fold_left_es
          (fun acc (encoded_commitment, _status_tree) ->
            let*? commitment = decode_commitment encoded_commitment in
            get_other_headers_of_identified_commitment
              commitment
              slot_id
              store
              acc)
          acc
          commitments_with_statuses)
      accu
      slot_ids

  let get_published_level_headers ~published_level ?header_status node_store =
    let open Lwt_result_syntax in
    let store = node_store.store in
    (* Get the list of slots indices from the given level. *)
    let*! slots_indices =
      list store @@ Path.Level.slots_indices published_level
    in
    (* Build the list of slot IDs. *)
    let slot_ids =
      List.rev_map
        (fun (index, _tree) ->
          {
            Services.Types.slot_level = published_level;
            slot_index = int_of_string index;
          })
        slots_indices
    in
    let* accu = get_other_headers slot_ids store [] in
    let* accu =
      let skip_commitment c =
        let open Result_syntax in
        let* commit = decode_commitment c in
        return @@ `Keep commit
      in
      get_accepted_headers ~skip_commitment slot_ids store accu
    in
    (* TODO: https://gitlab.com/tezos/tezos/-/issues/4541
       Enable the same filtering for GET /commitments/<commitment>/headers
       (function get_commitment_headers above). Push this filtering into the result
       construction? *)
    return
    @@
    match header_status with
    | None -> accu
    | Some hs ->
        List.filter_map
          (fun header ->
            if header.Services.Types.status = hs then Some header else None)
          accu

  (* TODO: https://gitlab.com/tezos/tezos/-/issues/4641

     handle with_proof flag -> store proofs on disk? *)
  let save_shard_proofs node_store commitment shard_proofs =
    Shard_proofs_cache.replace
      node_store.in_memory_shard_proofs
      commitment
      shard_proofs
end
back to top