Revision bc545429ddf8494c22cc1458dce1233fc53780ba authored by Diane Gallois-Wong on 14 June 2023, 12:40:14 UTC, committed by Diane Gallois-Wong on 15 June 2023, 11:50:07 UTC
1 parent 1c6c675
node.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2021 Nomadic Labs <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
open Cli_arg
type history_mode = Archive | Full of int option | Rolling of int option
type media_type = Json | Binary | Any
let string_of_media_type = function
| Any -> "any"
| Binary -> "binary"
| Json -> "json"
type tls_config = {certificate_path : string; key_path : string}
type argument =
| Network of string
| History_mode of history_mode
| Expected_pow of int
| Singleprocess
| Bootstrap_threshold of int
| Synchronisation_threshold of int
| Sync_latency of int
| Connections of int
| Private_mode
| Disable_p2p_maintenance
| Disable_p2p_swap
| Peer of string
| No_bootstrap_peers
| Disable_operations_precheck
| Media_type of media_type
| Metadata_size_limit of int option
| Metrics_addr of string
| Cors_origin of string
| Disable_mempool
| Version
let make_argument = function
| Network x -> ["--network"; x]
| History_mode Archive -> ["--history-mode"; "archive"]
| History_mode (Full None) -> ["--history-mode"; "full"]
| History_mode (Full (Some i)) ->
["--history-mode"; "full:" ^ string_of_int i]
| History_mode (Rolling None) -> ["--history-mode"; "rolling"]
| History_mode (Rolling (Some i)) ->
["--history-mode"; "rolling:" ^ string_of_int i]
| Expected_pow x -> ["--expected-pow"; string_of_int x]
| Singleprocess -> ["--singleprocess"]
| Bootstrap_threshold x -> ["--bootstrap-threshold"; string_of_int x]
| Synchronisation_threshold x ->
["--synchronisation-threshold"; string_of_int x]
| Sync_latency x -> ["--sync-latency"; string_of_int x]
| Connections x -> ["--connections"; string_of_int x]
| Private_mode -> ["--private-mode"]
| Disable_p2p_maintenance -> ["--disable-p2p-maintenance"]
| Disable_p2p_swap -> ["--disable-p2p-swap"]
| Peer x -> ["--peer"; x]
| No_bootstrap_peers -> ["--no-bootstrap-peers"]
| Disable_operations_precheck -> ["--disable-mempool-precheck"]
| Media_type media_type -> ["--media-type"; string_of_media_type media_type]
| Metadata_size_limit None -> ["--metadata-size-limit"; "unlimited"]
| Metadata_size_limit (Some i) -> ["--metadata-size-limit"; string_of_int i]
| Metrics_addr metrics_addr -> ["--metrics-addr"; metrics_addr]
| Cors_origin cors_origin -> ["--cors-origin"; cors_origin]
| Disable_mempool -> ["--disable-mempool"]
| Version -> ["--version"]
let make_arguments arguments = List.flatten (List.map make_argument arguments)
(** [true] if the two given arguments are the same type
and cannot be repeated. [false] otherwise.
*)
let is_redundant = function
| Network _, Network _
| History_mode _, History_mode _
| Expected_pow _, Expected_pow _
| Singleprocess, Singleprocess
| Bootstrap_threshold _, Bootstrap_threshold _
| Synchronisation_threshold _, Synchronisation_threshold _
| Sync_latency _, Sync_latency _
| Connections _, Connections _
| Private_mode, Private_mode
| Disable_p2p_maintenance, Disable_p2p_maintenance
| Disable_p2p_swap, Disable_p2p_swap
| No_bootstrap_peers, No_bootstrap_peers
| Disable_operations_precheck, Disable_operations_precheck
| Media_type _, Media_type _
| Metadata_size_limit _, Metadata_size_limit _
| Version, Version ->
true
| Metrics_addr addr1, Metrics_addr addr2 -> addr1 = addr2
| Peer peer1, Peer peer2 -> peer1 = peer2
| Network _, _
| History_mode _, _
| Expected_pow _, _
| Singleprocess, _
| Bootstrap_threshold _, _
| Synchronisation_threshold _, _
| Sync_latency _, _
| Connections _, _
| Private_mode, _
| Disable_p2p_maintenance, _
| Disable_p2p_swap, _
| No_bootstrap_peers, _
| Disable_operations_precheck, _
| Media_type _, _
| Metadata_size_limit _, _
| Peer _, _
| Metrics_addr _, _
| Cors_origin _, _
| Disable_mempool, _
| Version, _ ->
false
type 'a known = Unknown | Known of 'a
module Parameters = struct
type persistent_state = {
data_dir : string;
mutable net_port : int;
advertised_net_port : int option;
rpc_host : string;
rpc_port : int;
rpc_tls : tls_config option;
allow_all_rpc : bool;
default_expected_pow : int;
mutable default_arguments : argument list;
mutable arguments : argument list;
mutable pending_ready : unit option Lwt.u list;
mutable pending_level : (int * int option Lwt.u) list;
mutable pending_identity : string option Lwt.u list;
runner : Runner.t option;
}
type session_state = {
mutable ready : bool;
mutable level : int known;
mutable identity : string known;
}
let base_default_name = "node"
let default_colors = Log.Color.[|FG.cyan; FG.magenta; FG.yellow; FG.green|]
end
open Parameters
include Daemon.Make (Parameters)
let check_error ?exit_code ?msg node =
match node.status with
| Not_running ->
Test.fail "node %s is not running, it has no stderr" (name node)
| Running {process; _} -> Process.check_error ?exit_code ?msg process
let wait node =
match node.status with
| Not_running ->
Test.fail
"node %s is not running, cannot wait for it to terminate"
(name node)
| Running {process; _} -> Process.wait process
let name node = node.name
let net_port node = node.persistent_state.net_port
let advertised_net_port node = node.persistent_state.advertised_net_port
let rpc_scheme node =
match node.persistent_state.rpc_tls with Some _ -> "https" | None -> "http"
let rpc_host node = node.persistent_state.rpc_host
let rpc_port node = node.persistent_state.rpc_port
let rpc_endpoint node =
sf "%s://%s:%d" (rpc_scheme node) (rpc_host node) (rpc_port node)
let data_dir node = node.persistent_state.data_dir
let runner node = node.persistent_state.runner
let spawn_command node =
Process.spawn
?runner:node.persistent_state.runner
~name:node.name
~color:node.color
node.path
let spawn_identity_generate ?expected_pow node =
spawn_command
node
[
"identity";
"generate";
"--data-dir";
node.persistent_state.data_dir;
string_of_int
(Option.value
expected_pow
~default:node.persistent_state.default_expected_pow);
]
let identity_generate ?expected_pow node =
spawn_identity_generate ?expected_pow node |> Process.check
let show_history_mode = function
| Archive -> "archive"
| Full None -> "full"
| Full (Some i) -> "full_" ^ string_of_int i
| Rolling None -> "rolling"
| Rolling (Some i) -> "rolling_" ^ string_of_int i
let add_missing_argument arguments argument =
if List.exists (fun arg -> is_redundant (arg, argument)) arguments then
arguments
else argument :: arguments
let add_default_arguments arguments =
let arguments =
(* Give a default value of "sandbox" to --network. *)
add_missing_argument arguments (Network "sandbox")
in
(* Give a default value of 0 to --expected-pow. *)
add_missing_argument arguments (Expected_pow 0)
let spawn_config_command command node arguments =
let arguments =
List.fold_left
add_missing_argument
arguments
node.persistent_state.arguments
in
(* Since arguments will be in the configuration file, we will not need them after this. *)
node.persistent_state.arguments <- [] ;
spawn_command
node
("config" :: command :: "--data-dir" :: node.persistent_state.data_dir
:: make_arguments arguments)
let spawn_config_init = spawn_config_command "init"
let spawn_config_update = spawn_config_command "update"
let spawn_config_reset node arguments =
node.persistent_state.arguments <- node.persistent_state.default_arguments ;
spawn_config_command "reset" node arguments
let config_init node arguments =
spawn_config_init node arguments |> Process.check
let config_update node arguments =
spawn_config_update node arguments |> Process.check
let config_reset node arguments =
spawn_config_reset node arguments |> Process.check
let config_show node =
let* output =
spawn_command
node
["config"; "show"; "--data-dir"; node.persistent_state.data_dir]
|> Process.check_and_read_stdout
in
return (JSON.parse ~origin:"config" output)
module Config_file = struct
let filename node = sf "%s/config.json" @@ data_dir node
let read node = JSON.parse_file (filename node)
let write node config = JSON.encode_to_file (filename node) config
let update node update = read node |> update |> write node
let set_prevalidator ?(operations_request_timeout = 10.)
?(max_refused_operations = 1000) ?(operations_batch_size = 50)
?(disable_operations_precheck = false) old_config =
let prevalidator =
`O
[
("operations_request_timeout", `Float operations_request_timeout);
( "max_refused_operations",
`Float (float_of_int max_refused_operations) );
("operations_batch_size", `Float (float_of_int operations_batch_size));
("disable_precheck", `Bool disable_operations_precheck);
]
|> JSON.annotate ~origin:"set_prevalidator"
in
JSON.update
"shell"
(fun config -> JSON.put ("prevalidator", prevalidator) config)
old_config
let set_peer_validator ?(new_head_request_timeout = 60.) old_config =
let peer_validator =
`O
[
( "peer_validator",
`O [("new_head_request_timeout", `Float new_head_request_timeout)]
);
]
|> JSON.annotate ~origin:"set_peer_validator"
in
JSON.put ("shell", peer_validator) old_config
let sandbox_network_config =
`O
[
( "genesis",
`O
[
("timestamp", `String "2018-06-30T16:07:32Z");
( "block",
`String "BLockGenesisGenesisGenesisGenesisGenesisf79b5d1CoW2" );
("protocol", `String Protocol.genesis_hash);
] );
( "genesis_parameters",
`O
[
( "values",
`O [("genesis_pubkey", `String Constant.activator.public_key)]
);
] );
("chain_name", `String "TEZOS");
("sandboxed_chain_name", `String "SANDBOXED_TEZOS");
]
let ghostnet_sandbox_network_config =
`O
[
( "genesis",
`O
[
("timestamp", `String "2022-01-25T15:00:00Z");
( "block",
`String "BLockGenesisGenesisGenesisGenesisGenesis1db77eJNeJ9" );
("protocol", `String Protocol.genesis_hash);
] );
( "genesis_parameters",
`O
[
( "values",
`O [("genesis_pubkey", `String Constant.activator.public_key)]
);
] );
("chain_name", `String "TEZOS");
("sandboxed_chain_name", `String "SANDBOXED_TEZOS");
]
let set_sandbox_network_with_user_activated_upgrades upgrade_points old_config
=
let network =
sandbox_network_config
|> JSON.annotate
~origin:"set_sandbox_network_with_user_activated_upgrades"
|> JSON.put
( "user_activated_upgrades",
JSON.annotate ~origin:"user_activated_upgrades"
@@ `A
(List.map
(fun (level, protocol) ->
`O
[
("level", `Float (float level));
( "replacement_protocol",
`String (Protocol.hash protocol) );
])
upgrade_points) )
in
JSON.put ("network", network) old_config
let set_sandbox_network_with_user_activated_overrides overrides old_config =
let network =
sandbox_network_config
|> JSON.annotate
~origin:"set_sandbox_network_with_user_activated_overrides"
|> JSON.put
( "user_activated_protocol_overrides",
JSON.annotate ~origin:"user_activated_overrides"
@@ `A
(List.map
(fun (replaced_protocol, replacement_protocol) ->
`O
[
("replaced_protocol", `String replaced_protocol);
("replacement_protocol", `String replacement_protocol);
])
overrides) )
in
JSON.put ("network", network) old_config
let set_ghostnet_sandbox_network ?user_activated_upgrades () old_config =
let may_patch_user_activated_upgrades =
match user_activated_upgrades with
| None -> Fun.id
| Some upgrade_points ->
JSON.put
( "user_activated_upgrades",
JSON.annotate ~origin:"user_activated_upgrades"
@@ `A
(List.map
(fun (level, protocol) ->
`O
[
("level", `Float (float level));
( "replacement_protocol",
`String (Protocol.hash protocol) );
])
upgrade_points) )
in
JSON.put
( "network",
JSON.annotate
~origin:"set_ghostnet_sandbox_network"
ghostnet_sandbox_network_config
|> may_patch_user_activated_upgrades )
old_config
end
type snapshot_history_mode = Rolling_history | Full_history
type export_format = Tar | Raw
let spawn_snapshot_export ?(history_mode = Full_history) ?export_level
?(export_format = Tar) node file =
spawn_command
node
(["snapshot"; "export"; "--data-dir"; node.persistent_state.data_dir]
@ (match history_mode with
| Full_history -> []
| Rolling_history -> ["--rolling"])
@ optional_arg "block" string_of_int export_level
@ (["--export-format"]
@ match export_format with Tar -> ["tar"] | Raw -> ["raw"])
@ [file])
let snapshot_export ?history_mode ?export_level ?export_format node file =
spawn_snapshot_export ?history_mode ?export_level ?export_format node file
|> Process.check
let spawn_snapshot_info ?(json = false) node file =
spawn_command
node
(["snapshot"; "info"] @ (if json then ["--json"] else []) @ [file])
let snapshot_info ?json node file =
spawn_snapshot_info ?json node file |> Process.check
let spawn_snapshot_import ?(no_check = false) ?(reconstruct = false) node file =
spawn_command
node
(["snapshot"; "import"; "--data-dir"; node.persistent_state.data_dir]
@ (if reconstruct then ["--reconstruct"] else [])
@ (if no_check then ["--no-check"] else [])
@ [file])
let snapshot_import ?no_check ?reconstruct node file =
spawn_snapshot_import ?no_check ?reconstruct node file |> Process.check
let spawn_reconstruct node =
spawn_command
node
["reconstruct"; "--data-dir"; node.persistent_state.data_dir]
let reconstruct node = spawn_reconstruct node |> Process.check
let trigger_ready node value =
let pending = node.persistent_state.pending_ready in
node.persistent_state.pending_ready <- [] ;
List.iter (fun pending -> Lwt.wakeup_later pending value) pending
let set_ready node =
(match node.status with
| Not_running -> ()
| Running status -> status.session_state.ready <- true) ;
trigger_ready node (Some ())
let update_level node current_level =
(match node.status with
| Not_running -> ()
| Running status -> (
match status.session_state.level with
| Unknown -> status.session_state.level <- Known current_level
| Known old_level ->
status.session_state.level <- Known (max old_level current_level))) ;
let pending = node.persistent_state.pending_level in
node.persistent_state.pending_level <- [] ;
List.iter
(fun ((level, resolver) as pending) ->
if current_level >= level then
Lwt.wakeup_later resolver (Some current_level)
else
node.persistent_state.pending_level <-
pending :: node.persistent_state.pending_level)
pending
let update_identity node identity =
match node.status with
| Not_running -> ()
| Running status ->
(match status.session_state.identity with
| Unknown -> status.session_state.identity <- Known identity
| Known identity' ->
if identity' <> identity then Test.fail "node identity changed") ;
let pending = node.persistent_state.pending_identity in
node.persistent_state.pending_identity <- [] ;
List.iter
(fun resolver -> Lwt.wakeup_later resolver (Some identity))
pending
let handle_event node {name; value; timestamp = _} =
match name with
| "node_is_ready.v0" -> set_ready node
| "head_increment.v0" | "branch_switch.v0" -> (
match JSON.(value |-> "level" |> as_int_opt) with
| None ->
(* There are several kinds of events and maybe
this one is not the one with the level: ignore it. *)
()
| Some level -> update_level node level)
| "read_identity.v0" -> update_identity node (JSON.as_string value)
| "compilation_error.v0" -> (
match JSON.as_string_opt value with
| Some fname ->
if Sys.file_exists fname then (
let content = read_file fname in
Log.error "Protocol compilation failed:" ;
Log.error "%s" (String.trim content))
else
Log.error
"Protocol compilation failed but log file %S was not found"
fname
| None ->
Log.error "Protocol compilation failed but cannot read the payload")
| "set_head.v0" -> (
match JSON.(value |> geti 1 |> as_int_opt) with
| None -> ()
| Some level -> update_level node level)
| _ -> ()
let check_event ?where node name promise =
let* result = promise in
match result with
| None ->
raise (Terminated_before_event {daemon = node.name; event = name; where})
| Some x -> return x
let wait_for_ready node =
match node.status with
| Running {session_state = {ready = true; _}; _} -> unit
| Not_running | Running {session_state = {ready = false; _}; _} ->
let promise, resolver = Lwt.task () in
node.persistent_state.pending_ready <-
resolver :: node.persistent_state.pending_ready ;
check_event node "node_is_ready.v0" promise
let wait_for_level node level =
match node.status with
| Running {session_state = {level = Known current_level; _}; _}
when current_level >= level ->
return current_level
| Not_running | Running _ ->
let promise, resolver = Lwt.task () in
node.persistent_state.pending_level <-
(level, resolver) :: node.persistent_state.pending_level ;
check_event
node
"head_increment.v0 / branch_switch.v0"
~where:("level >= " ^ string_of_int level)
promise
let get_level node =
match node.status with
| Running {session_state = {level = Known level; _}; _} -> level
| Not_running | Running _ -> 0
let wait_for_identity node =
match node.status with
| Running {session_state = {identity = Known identity; _}; _} ->
return identity
| Not_running | Running _ ->
let promise, resolver = Lwt.task () in
node.persistent_state.pending_identity <-
resolver :: node.persistent_state.pending_identity ;
check_event node "read_identity.v0" promise
let wait_for_request ~request node =
let event_name =
match request with
| `Inject -> "request_completed_info.v0"
| `Flush -> "request_completed_info.v0"
| `Notify | `Arrived -> "request_completed_debug.v0"
in
let request_str =
match request with
| `Flush -> "flush"
| `Inject -> "inject"
| `Notify -> "notify"
| `Arrived -> "arrived"
in
let filter json =
match JSON.(json |-> "view" |-> "request" |> as_string_opt) with
| Some s when String.equal s request_str -> Some ()
| Some _ | None -> None
in
wait_for node event_name filter
let wait_for_connections node connections =
let counter = ref 0 in
let waiter, resolver = Lwt.task () in
on_event node (fun {name; _} ->
match name with
| "connection.v0" ->
incr counter ;
if !counter = connections then Lwt.wakeup resolver ()
| _ -> ()) ;
let* () = wait_for_ready node in
waiter
let wait_for_disconnections node disconnections =
let counter = ref 0 in
let waiter, resolver = Lwt.task () in
on_event node (fun {name; _} ->
match name with
| "disconnection.v0" ->
incr counter ;
if !counter = disconnections then Lwt.wakeup resolver ()
| _ -> ()) ;
let* () = wait_for_ready node in
waiter
let create ?runner ?(path = Constant.tezos_node) ?name ?color ?data_dir
?event_pipe ?net_port ?advertised_net_port ?(rpc_host = "localhost")
?rpc_port ?rpc_tls ?(allow_all_rpc = true) arguments =
let name = match name with None -> fresh_name () | Some name -> name in
let data_dir =
match data_dir with None -> Temp.dir ?runner name | Some dir -> dir
in
let net_port =
match net_port with None -> Port.fresh () | Some port -> port
in
let rpc_port =
match rpc_port with None -> Port.fresh () | Some port -> port
in
let arguments = add_default_arguments arguments in
let default_expected_pow =
list_find_map (function Expected_pow x -> Some x | _ -> None) arguments
|> Option.value ~default:0
in
let node =
create
?runner
~path
~name
?color
?event_pipe
{
data_dir;
net_port;
advertised_net_port;
rpc_host;
rpc_port;
rpc_tls;
allow_all_rpc;
default_arguments = arguments;
arguments;
default_expected_pow;
runner;
pending_ready = [];
pending_level = [];
pending_identity = [];
}
in
on_event node (handle_event node) ;
node
let add_argument node argument =
node.persistent_state.arguments <- argument :: node.persistent_state.arguments
let add_peer node peer =
let address =
Runner.address
?from:node.persistent_state.runner
peer.persistent_state.runner
^ ":"
in
add_argument node (Peer (address ^ string_of_int (net_port peer)))
let point ?from node =
let from =
match from with None -> None | Some peer -> peer.persistent_state.runner
in
let address = Runner.address ?from node.persistent_state.runner in
(address, net_port node)
let point_str ?from node =
let addr, port = point ?from node in
addr ^ ":" ^ Int.to_string port
let point_and_id ?from node =
let point = point_str ?from node in
let* id = wait_for_identity node in
Lwt.return (point ^ "#" ^ id)
let add_peer_with_id node peer =
let* peer = point_and_id ~from:node peer in
add_argument node (Peer peer) ;
Lwt.return_unit
let get_peers node =
List.filter_map
(fun arg -> match arg with Peer s -> Some s | _ -> None)
node.persistent_state.arguments
let remove_peers_json_file node =
let filename = sf "%s/peers.json" (data_dir node) in
Log.info "Removing file %s" filename ;
Sys.remove filename
(** [runlike_command_arguments node command arguments]
evaluates in a list of strings containing all command
line arguments needed to spawn a [command] like [run]
or [replay] for the given [node] and extra [arguments]. *)
let runlike_command_arguments node command arguments =
let net_addr, rpc_addr =
match node.persistent_state.runner with
| None -> ("127.0.0.1:", node.persistent_state.rpc_host ^ ":")
| Some _ ->
(* FIXME spawn an ssh tunnel in case of remote host *)
("0.0.0.0:", "0.0.0.0:")
in
let arguments =
List.fold_left
add_missing_argument
arguments
node.persistent_state.arguments
in
let command_args = make_arguments arguments in
let command_args =
match node.persistent_state.advertised_net_port with
| None -> command_args
| Some port -> "--advertised-net-port" :: string_of_int port :: command_args
in
let command_args =
if node.persistent_state.allow_all_rpc then
"--allow-all-rpc"
:: (rpc_addr ^ string_of_int node.persistent_state.rpc_port)
:: command_args
else command_args
in
let command_args =
match node.persistent_state.rpc_tls with
| None -> command_args
| Some {certificate_path; key_path} ->
"--rpc-tls" :: (certificate_path ^ "," ^ key_path) :: command_args
in
command :: "--data-dir" :: node.persistent_state.data_dir :: "--net-addr"
:: (net_addr ^ string_of_int node.persistent_state.net_port)
:: "--rpc-addr"
:: (rpc_addr ^ string_of_int node.persistent_state.rpc_port)
:: command_args
let do_runlike_command ?(on_terminate = fun _ -> ()) ?event_level
?event_sections_levels node arguments =
(match node.status with
| Not_running -> ()
| Running _ -> Test.fail "node %s is already running" node.name) ;
let on_terminate status =
on_terminate status ;
(* Cancel all [Ready] event listeners. *)
trigger_ready node None ;
(* Cancel all [Level_at_least] event listeners. *)
let pending = node.persistent_state.pending_level in
node.persistent_state.pending_level <- [] ;
List.iter (fun (_, pending) -> Lwt.wakeup_later pending None) pending ;
(* Cancel all [Read_identity] event listeners. *)
let pending = node.persistent_state.pending_identity in
node.persistent_state.pending_identity <- [] ;
List.iter (fun pending -> Lwt.wakeup_later pending None) pending ;
unit
in
run
?runner:node.persistent_state.runner
?event_level
?event_sections_levels
node
{ready = false; level = Unknown; identity = Unknown}
arguments
~on_terminate
let run ?patch_config ?on_terminate ?event_level ?event_sections_levels node
arguments =
let () =
match patch_config with
| None -> ()
| Some patch -> Config_file.update node patch
in
let arguments = runlike_command_arguments node "run" arguments in
do_runlike_command
?on_terminate
?event_level
?event_sections_levels
node
arguments
let replay ?on_terminate ?event_level ?event_sections_levels ?(strict = false)
?(blocks = ["head"]) node arguments =
let strict = if strict then ["--strict"] else [] in
let arguments =
runlike_command_arguments node "replay" arguments @ strict @ blocks
in
do_runlike_command
?on_terminate
?event_level
?event_sections_levels
node
arguments
let init ?runner ?path ?name ?color ?data_dir ?event_pipe ?net_port
?advertised_net_port ?rpc_host ?rpc_port ?rpc_tls ?event_level
?event_sections_levels ?patch_config ?snapshot arguments =
(* The single process argument does not exist in the configuration
file of the node. It is only known as a command-line option. As a
consequence, we filter Singleprocess from the list of arguments
passed to create, and we readd it if necessary when calling
run. *)
let single_process = List.mem Singleprocess arguments in
let node =
create
?runner
?path
?name
?color
?data_dir
?event_pipe
?net_port
?advertised_net_port
?rpc_host
?rpc_port
?rpc_tls
(List.filter (fun x -> x <> Singleprocess) arguments)
in
let* () = identity_generate node in
let* () = config_init node [] in
let* () =
match snapshot with
| Some (file, reconstruct) -> snapshot_import ~reconstruct node file
| None -> unit
in
let argument = if single_process then [Singleprocess] else [] in
let* () =
run ?patch_config ?event_level ?event_sections_levels node argument
in
let* () = wait_for_ready node in
return node
let send_raw_data node ~data =
(* Extracted from Lwt_utils_unix. *)
let write_string ?(pos = 0) ?len descr buf =
let len = match len with None -> String.length buf - pos | Some l -> l in
let rec inner pos len =
if len = 0 then Lwt.return_unit
else
Lwt.bind (Lwt_unix.write_string descr buf pos len) (function
| 0 ->
Lwt.fail End_of_file
(* other endpoint cleanly closed its connection *)
| nb_written -> inner (pos + nb_written) (len - nb_written))
in
inner pos len
in
Log.debug "Write raw data to node %s" node.name ;
let socket = Lwt_unix.socket PF_INET SOCK_STREAM 0 in
Lwt_unix.set_close_on_exec socket ;
let uaddr = Lwt_unix.ADDR_INET (Unix.inet_addr_loopback, net_port node) in
let* () = Lwt_unix.connect socket uaddr in
write_string socket data
let upgrade_storage node =
spawn_command
node
["upgrade"; "storage"; "--data-dir"; node.persistent_state.data_dir]
|> Process.check
let get_version node =
let version_flag = make_argument Version in
let* output =
spawn_command node version_flag |> Process.check_and_read_stdout
in
return @@ String.trim output
![swh spinner](/static/img/swh-spinner.gif)
Computing file changes ...