Revision 02e7f855cb4dcc54e2f1e1de41e8720cd1a9ed7f authored by pecornilleau on 04 April 2024, 16:11:12 UTC, committed by pecornilleau on 04 April 2024, 17:15:54 UTC
1 parent eaf7214
Raw File
sc_rollup_node.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2021 Nomadic Labs <contact@nomadic-labs.com>                *)
(* Copyright (c) 2023 Marigold <contact@marigold.dev>                        *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

type 'a known = Unknown | Known of 'a

type purpose = Operating | Batching | Cementing | Recovering

type operation_kind =
  | Publish
  | Add_messages
  | Cement
  | Timeout
  | Refute
  | Recover
  | Execute_outbox_message

let operation_kind_of_string = function
  | "publish" -> Some Publish
  | "add_messages" -> Some Add_messages
  | "cement" -> Some Cement
  | "timeout" -> Some Timeout
  | "refute" -> Some Refute
  | "recover" -> Some Recover
  | "execute_outbox_message" -> Some Execute_outbox_message
  | _ -> None

let operation_kind_of_string_exn s =
  match operation_kind_of_string s with
  | Some p -> p
  | None -> invalid_arg ("operation_kind_of_string " ^ s)

let string_of_operation_kind = function
  | Publish -> "publish"
  | Add_messages -> "add_messages"
  | Cement -> "cement"
  | Timeout -> "timeout"
  | Refute -> "refute"
  | Recover -> "recover"
  | Execute_outbox_message -> "execute_outbox_message"

type mode =
  | Batcher
  | Custom of operation_kind list
  | Maintenance
  | Observer
  | Operator
  | Accuser
  | Bailout

let string_of_mode = function
  | Observer -> "observer"
  | Batcher -> "batcher"
  | Maintenance -> "maintenance"
  | Operator -> "operator"
  | Custom op_kinds ->
      if op_kinds = [] then "custom"
      else
        "custom:"
        ^ String.concat "," (List.map string_of_operation_kind op_kinds)
  | Accuser -> "accuser"
  | Bailout -> "bailout"

type history_mode = Archive | Full

let string_of_history_mode = function Archive -> "archive" | Full -> "full"

type argument =
  | Data_dir of string
  | Rpc_addr of string
  | Rpc_port of int
  | Log_kernel_debug
  | Log_kernel_debug_file of string
  | Metrics_addr of string
  | Injector_attempts of int
  | Boot_sector_file of string
  | Dac_observer of Dac_node.t
  | Loser_mode of string
  | No_degraded
  | Gc_frequency of int
  | History_mode of history_mode
  | Dal_node of Dal_node.t
  | Mode of mode
  | Rollup of string
  | Pre_images_endpoint of string

let make_argument = function
  | Data_dir dir -> ["--data-dir"; dir]
  | Rpc_addr host -> ["--rpc-addr"; host]
  | Rpc_port port -> ["--rpc-port"; string_of_int port]
  | Log_kernel_debug -> ["--log-kernel-debug"]
  | Log_kernel_debug_file file -> ["--log-kernel-debug-file"; file]
  | Metrics_addr addr -> ["--metrics-addr"; addr]
  | Injector_attempts n -> ["--injector-attempts"; string_of_int n]
  | Boot_sector_file file -> ["--boot-sector-file"; file]
  | Dac_observer dac_node -> ["--dac-observer"; Dac_node.endpoint dac_node]
  | Loser_mode loser -> ["--loser-mode"; loser]
  | No_degraded -> ["--no-degraded"]
  | Gc_frequency freq -> ["--gc-frequency"; string_of_int freq]
  | History_mode mode -> ["--history-mode"; string_of_history_mode mode]
  | Dal_node dal_node -> ["--dal-node"; Dal_node.rpc_endpoint dal_node]
  | Mode mode -> ["--mode"; string_of_mode mode]
  | Rollup addr -> ["--rollup"; addr]
  | Pre_images_endpoint addr -> ["--pre-images-endpoint"; addr]

let is_redundant = function
  | Data_dir _, Data_dir _
  | Rpc_addr _, Rpc_addr _
  | Rpc_port _, Rpc_port _
  | Log_kernel_debug, Log_kernel_debug
  | Log_kernel_debug_file _, Log_kernel_debug_file _
  | Injector_attempts _, Injector_attempts _
  | Boot_sector_file _, Boot_sector_file _
  | Dac_observer _, Dac_observer _
  | Loser_mode _, Loser_mode _
  | No_degraded, No_degraded
  | Gc_frequency _, Gc_frequency _
  | History_mode _, History_mode _
  | Dal_node _, Dal_node _
  | Mode _, Mode _
  | Rollup _, Rollup _
  | Pre_images_endpoint _, Pre_images_endpoint _ ->
      true
  | Metrics_addr addr1, Metrics_addr addr2 -> addr1 = addr2
  | Metrics_addr _, _
  | Data_dir _, _
  | Rpc_addr _, _
  | Rpc_port _, _
  | Log_kernel_debug, _
  | Log_kernel_debug_file _, _
  | Injector_attempts _, _
  | Boot_sector_file _, _
  | Dac_observer _, _
  | Loser_mode _, _
  | No_degraded, _
  | Gc_frequency _, _
  | History_mode _, _
  | Dal_node _, _
  | Mode _, _
  | Rollup _, _
  | Pre_images_endpoint _, _ ->
      false

let make_arguments arguments = List.flatten (List.map make_argument arguments)

let add_missing_argument arguments argument =
  if List.exists (fun arg -> is_redundant (arg, argument)) arguments then
    arguments
  else argument :: arguments

(** join both list, if one argument exists in both list, then the one
    of [extra_arguments] is used. This is so arguments added in
    [~extra_arguments] for {!run} like function takes precedent from
    default arguments. *)
let add_missing_arguments ~extra_arguments ~arguments =
  List.fold_left add_missing_argument extra_arguments arguments

let optional_switch arg switch = if switch then [arg] else []

let optional_arg const = function None -> [] | Some x -> [const x]

module Parameters = struct
  type persistent_state = {
    data_dir : string;
    base_dir : string;
    operators : (purpose * string) list;
    default_operator : string option;
    metrics_addr : string option;
    metrics_port : int;
    rpc_host : string;
    rpc_port : int;
    mode : mode;
    dal_node : Dal_node.t option;
    loser_mode : string option;
    allow_degraded : bool;
    gc_frequency : int;
    history_mode : history_mode;
    password_file : string option;
    mutable endpoint : Client.endpoint;
    mutable pending_ready : unit option Lwt.u list;
    mutable pending_level : (int * int option Lwt.u) list;
    runner : Runner.t option;
  }

  type session_state = {mutable ready : bool; mutable level : int known}

  let base_default_name = "sc-rollup-node"

  let default_colors = Log.Color.[|FG.magenta; FG.green; FG.yellow; FG.cyan|]
end

open Parameters
include Daemon.Make (Parameters)

let mode_of_string s =
  match String.split_on_char ':' s with
  | ["custom"; kinds] ->
      let operation_kinds_strs = String.split_on_char ',' kinds in
      let operation_kinds =
        List.map operation_kind_of_string_exn operation_kinds_strs
      in
      Custom operation_kinds
  | _ -> (
      match String.lowercase_ascii s with
      | "observer" -> Observer
      | "accuser" -> Accuser
      | "bailout" -> Bailout
      | "batcher" -> Batcher
      | "maintenance" -> Maintenance
      | "operator" -> Operator
      | "custom" -> Custom []
      | _ -> invalid_arg (Format.sprintf "%S is not an existing mode" s))

let string_of_history_mode = function Archive -> "archive" | Full -> "full"

let check_error ?exit_code ?msg sc_node =
  match sc_node.status with
  | Not_running ->
      Test.fail
        "Smart rollup node %s is not running, it has no stderr"
        (name sc_node)
  | Running {process; _} -> Process.check_error ?exit_code ?msg process

let wait sc_node =
  match sc_node.status with
  | Not_running ->
      Test.fail
        "Smart rollup node %s is not running, cannot wait for it to terminate"
        (name sc_node)
  | Running {process; _} -> Process.wait process

let process node =
  match node.status with
  | Running {process; _} -> Some process
  | Not_running -> None

let write_in_stdin rollup_node str =
  match rollup_node.status with
  | Running {stdin; _} -> Lwt_io.write stdin Format.(sprintf "%s\n%!" str)
  | Not_running ->
      Test.fail
        "Smart rollup node %s is not running, cannot write in stdin"
        (name rollup_node)

let color sc_node = sc_node.color

let rpc_host sc_node = sc_node.persistent_state.rpc_host

let rpc_port sc_node = sc_node.persistent_state.rpc_port

let metrics node =
  ( Option.value
      ~default:Constant.default_host
      node.persistent_state.metrics_addr,
    node.persistent_state.metrics_port )

let endpoint sc_node =
  Printf.sprintf "http://%s:%d" (rpc_host sc_node) (rpc_port sc_node)

let data_dir sc_node = sc_node.persistent_state.data_dir

let base_dir sc_node = sc_node.persistent_state.base_dir

let string_of_purpose = function
  | Operating -> "operating"
  | Batching -> "batching"
  | Cementing -> "cementing"
  | Recovering -> "recovering"

(* Extracts operators from node state, handling custom mode, and
   formats them as "purpose:operator". Includes default operator if present. *)
let operators_params rollup_node =
  List.fold_left
    (fun acc (purpose, operator) ->
      (string_of_purpose purpose ^ ":" ^ operator) :: acc)
    (Option.to_list rollup_node.persistent_state.default_operator)
    rollup_node.persistent_state.operators

let make_command_arguments ?password_file node =
  [
    "--endpoint";
    Client.string_of_endpoint ~hostname:true node.persistent_state.endpoint;
    "--base-dir";
    base_dir node;
  ]
  @ Cli_arg.optional_arg "password-filename" Fun.id password_file

let spawn_command sc_node args =
  Process.spawn
    ?runner:sc_node.persistent_state.runner
    ~name:sc_node.name
    ~color:sc_node.color
    sc_node.path
  @@ make_command_arguments sc_node
  @ args

let runlike_argument rollup_node =
  let rollup_node_state = rollup_node.persistent_state in
  [
    Data_dir (data_dir rollup_node);
    (let metrics_addr, metrics_port = metrics rollup_node in
     Metrics_addr (metrics_addr ^ ":" ^ string_of_int metrics_port));
    Rpc_addr (rpc_host rollup_node);
    Rpc_port (rpc_port rollup_node);
    History_mode rollup_node_state.history_mode;
    Gc_frequency rollup_node_state.gc_frequency;
  ]
  @ optional_arg (fun s -> Loser_mode s) rollup_node_state.loser_mode
  @ optional_switch No_degraded (not rollup_node_state.allow_degraded)
  @ optional_arg (fun n -> Dal_node n) rollup_node_state.dal_node

let node_args rollup_node rollup_address =
  let mode = string_of_mode rollup_node.persistent_state.mode in
  ( mode,
    ["for"; rollup_address; "with"; "operators"] @ operators_params rollup_node,
    runlike_argument rollup_node )

let legacy_node_args rollup_node rollup_address =
  Mode rollup_node.persistent_state.mode :: Rollup rollup_address
  :: runlike_argument rollup_node

let spawn_config_init ?(force = false) sc_node rollup_address =
  let mode, cmd, args = node_args sc_node rollup_address in
  spawn_command sc_node @@ ["init"; mode; "config"] @ cmd @ make_arguments args
  @ Cli_arg.optional_switch "force" force

let config_init ?force sc_node rollup_address =
  let process = spawn_config_init ?force sc_node rollup_address in
  let* output = Process.check_and_read_stdout process in
  match
    output =~* rex "Smart rollup node configuration written in ([^\n]*)"
  with
  | None -> failwith "Smart rollup node configuration initialization failed"
  | Some filename -> return filename

module Config_file = struct
  let filename sc_node = sf "%s/config.json" @@ data_dir sc_node

  let read sc_node = JSON.parse_file (filename sc_node)

  let write sc_node config = JSON.encode_to_file (filename sc_node) config

  let update sc_node update = read sc_node |> update |> write sc_node
end

let trigger_ready sc_node value =
  let pending = sc_node.persistent_state.pending_ready in
  sc_node.persistent_state.pending_ready <- [] ;
  List.iter (fun pending -> Lwt.wakeup_later pending value) pending

let set_ready sc_node =
  (match sc_node.status with
  | Not_running -> ()
  | Running status -> status.session_state.ready <- true) ;
  trigger_ready sc_node (Some ())

let check_event ?timeout ?where sc_node name promise =
  let promise = Lwt.map Result.ok promise in
  let* result =
    match timeout with
    | None -> promise
    | Some timeout ->
        Lwt.pick
          [
            promise;
            (let* () = Lwt_unix.sleep timeout in
             Lwt.return_error ());
          ]
  in
  match result with
  | Ok (Some x) -> return x
  | Ok None ->
      raise
        (Terminated_before_event {daemon = sc_node.name; event = name; where})
  | Error () ->
      Format.ksprintf
        failwith
        "Timeout waiting for event %s of %s"
        name
        sc_node.name

let wait_for_ready sc_node =
  match sc_node.status with
  | Running {session_state = {ready = true; _}; _} -> unit
  | Not_running | Running {session_state = {ready = false; _}; _} ->
      let promise, resolver = Lwt.task () in
      sc_node.persistent_state.pending_ready <-
        resolver :: sc_node.persistent_state.pending_ready ;
      check_event sc_node "sc_rollup_node_is_ready.v0" promise

let update_level sc_node current_level =
  (match sc_node.status with
  | Not_running -> ()
  | Running status -> (
      match status.session_state.level with
      | Unknown -> status.session_state.level <- Known current_level
      | Known old_level ->
          status.session_state.level <- Known (max old_level current_level))) ;
  let pending = sc_node.persistent_state.pending_level in
  sc_node.persistent_state.pending_level <- [] ;
  List.iter
    (fun ((level, resolver) as pending) ->
      if current_level >= level then
        Lwt.wakeup_later resolver (Some current_level)
      else
        sc_node.persistent_state.pending_level <-
          pending :: sc_node.persistent_state.pending_level)
    pending

let wait_for_level ?timeout sc_node level =
  match sc_node.status with
  | Running {session_state = {level = Known current_level; _}; _}
    when current_level >= level ->
      return current_level
  | Not_running | Running _ ->
      let promise, resolver = Lwt.task () in
      sc_node.persistent_state.pending_level <-
        (level, resolver) :: sc_node.persistent_state.pending_level ;
      check_event
        ?timeout
        sc_node
        "smart_rollup_node_daemon_new_head_processed.v0"
        ~where:("level >= " ^ string_of_int level)
        promise

let unsafe_wait_sync ?timeout sc_node =
  let* node_level =
    match sc_node.persistent_state.endpoint with
    | Node node -> Node.get_level node
    | endpoint ->
        let* level =
          RPC_core.call (Client.as_foreign_endpoint endpoint)
          @@ RPC.get_chain_block_helper_current_level ()
        in
        return level.level
  in
  wait_for_level ?timeout sc_node node_level

let wait_sync sc_node ~timeout = unsafe_wait_sync sc_node ~timeout

let handle_event sc_node {name; value; timestamp = _} =
  match name with
  | "smart_rollup_node_is_ready.v0" -> set_ready sc_node
  | "smart_rollup_node_daemon_new_head_processed.v0" ->
      let level = JSON.(value |-> "level" |> as_int) in
      update_level sc_node level
  | "smart_rollup_node_daemon_new_heads_processed.v0" ->
      let level = JSON.(value |-> "to" |> as_int) in
      update_level sc_node level
  | _ -> ()

let create_with_endpoint ?runner ?path ?name ?color ?data_dir ~base_dir
    ?event_pipe ?metrics_addr ?metrics_port ?(rpc_host = Constant.default_host)
    ?rpc_port ?(operators = []) ?default_operator
    ?(dal_node : Dal_node.t option) ?loser_mode ?(allow_degraded = false)
    ?(gc_frequency = 1) ?(history_mode = Full) ?password_file mode endpoint =
  let name = match name with None -> fresh_name () | Some name -> name in
  let data_dir =
    match data_dir with None -> Temp.dir name | Some dir -> dir
  in
  let rpc_port =
    match rpc_port with None -> Port.fresh () | Some port -> port
  in
  let metrics_port =
    match metrics_port with None -> Port.fresh () | Some port -> port
  in
  let path =
    Option.value ~default:(Uses.path Constant.octez_smart_rollup_node) path
  in
  let sc_node =
    create
      ~path
      ~name
      ?runner
      ?color
      ?event_pipe
      {
        data_dir;
        base_dir;
        metrics_addr;
        metrics_port;
        rpc_host;
        rpc_port;
        operators;
        default_operator;
        mode;
        endpoint;
        dal_node;
        loser_mode;
        allow_degraded;
        gc_frequency;
        history_mode;
        password_file;
        pending_ready = [];
        pending_level = [];
        runner;
      }
  in
  on_event sc_node (handle_event sc_node) ;
  sc_node

let create ?runner ?path ?name ?color ?data_dir ~base_dir ?event_pipe
    ?metrics_addr ?metrics_port ?rpc_host ?rpc_port ?operators ?default_operator
    ?dal_node ?loser_mode ?allow_degraded ?gc_frequency ?history_mode
    ?password_file mode (node : Node.t) =
  create_with_endpoint
    ?runner
    ?path
    ?name
    ?color
    ?data_dir
    ~base_dir
    ?event_pipe
    ?metrics_addr
    ?metrics_port
    ?rpc_host
    ?rpc_port
    ?operators
    ?default_operator
    ?dal_node
    ?loser_mode
    ?allow_degraded
    ?gc_frequency
    ?history_mode
    ?password_file
    mode
    (Node node)

let do_runlike_command ?event_level ?event_sections_levels ?password_file node
    arguments =
  if node.status <> Not_running then
    Test.fail "Smart contract rollup node %s is already running" node.name ;
  let on_terminate _status =
    trigger_ready node None ;
    unit
  in
  let arguments = make_command_arguments ?password_file node @ arguments in
  run
    ?runner:node.persistent_state.runner
    ?event_level
    ?event_sections_levels
    node
    {ready = false; level = Unknown}
    arguments
    ~on_terminate

let run ?(legacy = false) ?(restart = false) ?mode ?event_level
    ?event_sections_levels ?(wait_ready = true) ?password_file node
    rollup_address extra_arguments =
  let* () = if restart then terminate node else return () in
  let cmd =
    if legacy then
      let arguments = legacy_node_args node rollup_address in
      let arguments = add_missing_arguments ~arguments ~extra_arguments in
      ["run"] @ make_arguments arguments
    else
      let default_mode, cmd, arguments = node_args node rollup_address in
      let arguments = add_missing_arguments ~arguments ~extra_arguments in
      let final_mode =
        match mode with Some m -> string_of_mode m | None -> default_mode
      in
      ["run"; final_mode] @ cmd @ make_arguments arguments
  in
  let* () =
    do_runlike_command
      ?event_level
      ?event_sections_levels
      ?password_file
      node
      cmd
  in
  if wait_ready then wait_for_ready node else unit

let change_node_and_restart ?event_level sc_rollup_node rollup_address node =
  let* () = terminate sc_rollup_node in
  sc_rollup_node.persistent_state.endpoint <- Node node ;
  run ?event_level sc_rollup_node rollup_address []

let change_node_mode sc_rollup_node mode =
  {
    sc_rollup_node with
    persistent_state = {sc_rollup_node.persistent_state with mode};
  }

let dump_durable_storage ~sc_rollup_node ~dump ?(block = "head") () =
  let cmd =
    [
      "dump";
      "durable";
      "storage";
      "into";
      dump;
      "--data-dir";
      sc_rollup_node.persistent_state.data_dir;
      "--block";
      block;
    ]
  in
  let process = spawn_command sc_rollup_node cmd in
  Process.check process

let export_snapshot ?(compress_on_the_fly = false) ?(compact = false)
    sc_rollup_node dir =
  let process =
    spawn_command
      sc_rollup_node
      ([
         "snapshot";
         "export";
         "--dest";
         dir;
         "--data-dir";
         data_dir sc_rollup_node;
       ]
      @ Cli_arg.optional_switch "compress-on-the-fly" compress_on_the_fly
      @ Cli_arg.optional_switch "compact" compact)
  in
  let parse process =
    let* output = Process.check_and_read_stdout process in
    match output =~* rex "Snapshot exported to ([^\n]*)" with
    | None -> Test.fail "Snapshot export failed"
    | Some filename -> return filename
  in
  Runnable.{value = process; run = parse}

let import_snapshot ?(force = false) sc_rollup_node ~snapshot_file =
  let process =
    spawn_command
      sc_rollup_node
      ([
         "snapshot";
         "import";
         snapshot_file;
         "--data-dir";
         data_dir sc_rollup_node;
       ]
      @ Cli_arg.optional_switch "force" force)
  in
  Runnable.{value = process; run = Process.check}

let as_rpc_endpoint (t : t) =
  let state = t.persistent_state in
  let scheme = "http" in
  Endpoint.{scheme; host = state.rpc_host; port = state.rpc_port}

module RPC = struct
  module RPC_callers : RPC_core.CALLERS with type uri_provider := t = struct
    let call ?rpc_hooks ?log_request ?log_response_status ?log_response_body
        node rpc =
      RPC_core.call
        ?rpc_hooks
        ?log_request
        ?log_response_status
        ?log_response_body
        (as_rpc_endpoint node)
        rpc

    let call_raw ?rpc_hooks ?log_request ?log_response_status ?log_response_body
        node rpc =
      RPC_core.call_raw
        ?rpc_hooks
        ?log_request
        ?log_response_status
        ?log_response_body
        (as_rpc_endpoint node)
        rpc

    let call_json ?rpc_hooks ?log_request ?log_response_status
        ?log_response_body node rpc =
      RPC_core.call_json
        ?rpc_hooks
        ?log_request
        ?log_response_status
        ?log_response_body
        (as_rpc_endpoint node)
        rpc
  end

  include RPC_callers
end
back to top