Revision 5c0277382a82ab3642ddd1de790a362e8c047837 authored by Pierre-Louis on 11 September 2023, 13:01:33 UTC, committed by Pierre-Louis on 25 September 2023, 12:16:22 UTC
1 parent 97eebdf
storage_snapshots.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
(* Testing
-------
Component: Storage snapshot
Invocation: dune exec tezt/tests/main.exe -- -f storage_snapshots.ml
Subject: Tests both the snapshot mechanism and the store's behaviour
*)
let node_arguments = Node.[Synchronisation_threshold 0]
let pp_snapshot_export_format fmt v =
Format.fprintf fmt "%s" (match v with Node.Tar -> "tar" | Raw -> "raw")
let pp_snapshot_history_mode fmt v =
Format.fprintf
fmt
"%s"
(match v with
| Node.Rolling_history -> "rolling"
| Node.Full_history -> "full")
let get_constants client =
let* constants =
RPC.Client.call client @@ RPC.get_chain_block_context_constants ()
in
let preserved_cycles = JSON.(constants |-> "preserved_cycles" |> as_int) in
let blocks_per_cycle = JSON.(constants |-> "blocks_per_cycle" |> as_int) in
let max_op_ttl =
JSON.(constants |-> "max_operations_time_to_live" |> as_int)
in
return (preserved_cycles, blocks_per_cycle, max_op_ttl)
let export_snapshot node ~export_level ~snapshot_dir ~history_mode
~export_format =
Log.info
"Exporting %a snapshot for %s at level %d"
pp_snapshot_history_mode
history_mode
(Node.name node)
export_level ;
let filename =
Format.asprintf
"%s.%a.%a"
(Node.name node)
pp_snapshot_history_mode
history_mode
pp_snapshot_export_format
export_format
in
let* () =
Node.snapshot_export
~export_level
~history_mode
~export_format
node
(snapshot_dir // filename)
in
Log.info
"Node %s exported %s (level %d, format %s)"
(Node.name node)
filename
export_level
(Format.asprintf "%a" pp_snapshot_export_format export_format) ;
return filename
(* Checks that the node's head, checkpoint, savepoint and caboose are
the expected ones *)
let check_consistency_after_import node ~expected_head ~expected_checkpoint
~expected_savepoint ~expected_caboose =
Log.info "Checking node consistency for %s" (Node.name node) ;
let* block_head = RPC.call node @@ RPC.get_chain_block () in
let level = JSON.(block_head |-> "header" |-> "level" |> as_int) in
let* {level = checkpoint; _} =
RPC.call node @@ RPC.get_chain_level_checkpoint ()
in
let* {level = savepoint; _} =
RPC.call node @@ RPC.get_chain_level_savepoint ()
in
let* {level = caboose; _} = RPC.call node @@ RPC.get_chain_level_caboose () in
Check.((level = expected_head) int) ~error_msg:"expected level = %R, got %L" ;
Check.((checkpoint = expected_checkpoint) int)
~error_msg:"expected checkpoint = %R, got %L" ;
Check.((savepoint = expected_savepoint) int)
~error_msg:"expected savepoint = %R, got %L" ;
Check.((caboose = expected_caboose) int)
~error_msg:"expected caboose = %R, got %L" ;
unit
let check_blocks_availability node ~history_mode ~head ~savepoint ~caboose =
(* The metadata of genesis is available anyway *)
Log.info "Checking blocks availability for %s" (Node.name node) ;
let* (_ : RPC.block_metadata) =
RPC.call node @@ RPC.get_chain_block_metadata ~block:"0" ()
in
let iter_block_range_s a b f =
Lwt_list.iter_s f (range a b |> List.rev |> List.map string_of_int)
in
let expect_no_metadata block =
(* Expects success, as the header must be stored. *)
let* (_ : JSON.t) = RPC.call node @@ RPC.get_chain_block_header ~block () in
(* Expects failure, as the metadata must not be stored. *)
let* {body; code} =
RPC.call_json node @@ RPC.get_chain_block_metadata ~block ()
in
(* In the client, attempting to retrieve missing metadata outputs:
Command failed: Unable to find block *)
Check.(
(code = 500) ~__LOC__ int ~error_msg:"Expected HTTP status %R, got %L.") ;
let error_id = JSON.(body |=> 0 |-> "id" |> as_string) in
Check.(
(error_id = "store.metadata_not_found")
~__LOC__
string
~error_msg:"Expected error id %R, got %L") ;
unit
in
let expect_metadata block =
(* Expects success, as the metadata must be stored. *)
let* (_ : RPC.block_metadata) =
RPC.call node @@ RPC.get_chain_block_metadata ~block ()
in
unit
in
let expect_no_block block =
let* {code; _} = RPC.call_raw node @@ RPC.get_chain_block ~block () in
(* In the client, attempting to retrieve an unknown block outputs:
Did not find service *)
Check.(
(code = 404) ~__LOC__ int ~error_msg:"Expected HTTP status %R, got %L.") ;
unit
in
let* () =
match history_mode with
| Node.Full_history ->
iter_block_range_s 1 (savepoint - 1) @@ expect_no_metadata
| _ ->
if caboose <> 0 then
iter_block_range_s 1 (caboose - 1) @@ expect_no_block
else unit
in
let* () = iter_block_range_s savepoint head @@ expect_metadata in
unit
let sync_all_nodes cluster expected_level =
let* _ =
Lwt_list.map_p (fun node -> Node.wait_for_level node expected_level) cluster
in
Log.info "All nodes has caught up on level %d!" expected_level ;
unit
let bake_blocks node client ~blocks_to_bake =
Log.info "Baking a batch of %d blocks on %s" blocks_to_bake (Node.name node) ;
repeat blocks_to_bake @@ fun () ->
Client.bake_for_and_wait
~endpoint:(Node node)
~node
~minimal_timestamp:true
client
let export_import_and_check node ~export_level ~history_mode ~export_format
~max_op_ttl =
let export_dirs = List.map Temp.dir ["first_export"; "second_export"] in
let* final_node =
Lwt_list.fold_left_s
(fun node snapshot_dir ->
let* filename =
export_snapshot
node
~export_level
~snapshot_dir
~history_mode
~export_format
in
let fresh_node_name =
Format.asprintf
"%a_node_from_%s"
pp_snapshot_history_mode
history_mode
filename
in
let* fresh_node =
Node.init
~name:fresh_node_name
~snapshot:(snapshot_dir // filename, false)
node_arguments
in
let* () = Node.wait_for_ready fresh_node in
let expected_checkpoint, expected_savepoint, expected_caboose =
match history_mode with
| Node.Full_history -> (export_level, export_level, 0)
| Node.Rolling_history ->
(export_level, export_level, max 0 (export_level - max_op_ttl))
in
let* () =
check_consistency_after_import
fresh_node
~expected_head:export_level
~expected_checkpoint
~expected_savepoint
~expected_caboose
in
let* () =
check_blocks_availability
fresh_node
~history_mode
~head:export_level
~savepoint:expected_savepoint
~caboose:expected_caboose
in
let* () = Node.terminate fresh_node in
return fresh_node)
node
export_dirs
in
let* () = Node.terminate final_node in
unit
(* This test aims to:
- start 3 nodes: an archive, a full and a rolling one,
- bake few blocks using the archive node as a baker,
- export all kinds of snapshots (in terms of both history mode and
export format) from each node,
- import those snapshots and start the fresh nodes accordingly,
- check the consistency and data availability of those fresh nodes,
- re-export from the fresh nodes to re-import and re-check the imported data. *)
let test_export_import_snapshots =
Protocol.register_test
~__FILE__
~title:"storage snapshot export and import"
~tags:["storage"; "snapshot"; "export"; "import"]
@@ fun protocol ->
let archive_node =
Node.create
~name:"archive_node"
(node_arguments @ Node.[History_mode Archive])
in
let full_node =
Node.create
~name:"full_node"
(node_arguments @ Node.[History_mode (Full None)])
in
let rolling_node =
Node.create
~name:"rolling_node"
(node_arguments @ Node.[History_mode (Rolling None)])
in
let cluster = [archive_node; full_node; rolling_node] in
Cluster.clique cluster ;
let* () = Cluster.start ~public:true cluster in
let* client = Client.init ~endpoint:(Node archive_node) () in
let* () = Client.activate_protocol_and_wait ~protocol client in
let* preserved_cycles, blocks_per_cycle, max_op_ttl = get_constants client in
(* Bake enough blocks so that the rolling node caboose is not at the
genesis anymore. To do so, we need to bake at least 3 cycles,
after activating the protocol, i.e 3*8 = 24 blocks. *)
let blocks_to_bake = (preserved_cycles + 1) * blocks_per_cycle in
let* () = bake_blocks archive_node client ~blocks_to_bake in
let* archive_level = Node.get_level archive_node in
let* () = sync_all_nodes cluster archive_level in
(* Terminate all nodes to save resources. Note: we may consider
that exporting a snapshot from a node that is running is an
actual usecase (such exports are already done in other tests). *)
let* () = Lwt_list.iter_p Node.terminate cluster in
let export_import_and_check =
export_import_and_check ~max_op_ttl ~export_level:archive_level
in
let* () =
Lwt_list.iter_s
(fun export_format ->
Lwt_list.iter_s
(fun (history_mode, nodes) ->
Lwt_list.iter_s
(fun node ->
export_import_and_check node ~history_mode ~export_format)
nodes)
[
(Node.Full_history, [archive_node; full_node]);
(Node.Rolling_history, [archive_node; full_node; rolling_node]);
])
Node.[Tar; Raw]
in
unit
(* This test aims to export and import a rolling snapshot, bake some
blocks and make sure that the checkpoint, savepoint and caboose are
well dragged. *)
let test_drag_after_rolling_import =
Protocol.register_test
~__FILE__
~title:"storage snapshot drag after rolling import"
~tags:["storage"; "snapshot"; "drag"; "rolling"; "import"]
@@ fun protocol ->
let archive_node =
Node.create
~name:"archive_node"
(node_arguments @ Node.[History_mode Archive])
in
(* The number of additional cycles is expected to reflect the
default value from
Shell_services.History_modes.default_additional_cycles. *)
let additional_cycles = 1 in
let rolling_node =
Node.create
~name:"rolling_node"
(node_arguments @ Node.[History_mode (Rolling (Some additional_cycles))])
in
let cluster = [archive_node; rolling_node] in
Cluster.clique cluster ;
let* () = Cluster.start ~public:true cluster in
let* client = Client.init ~endpoint:(Node archive_node) () in
let* () = Client.activate_protocol_and_wait ~protocol client in
let* preserved_cycles, blocks_per_cycle, max_op_ttl = get_constants client in
Log.info "Baking a few blocks"
(* Baking enough blocks so that the caboose is not the genesis
anymore (depending on the max_op_ttl)*) ;
let blocks_to_bake = (1 * blocks_per_cycle) + max_op_ttl in
let* () = bake_blocks archive_node client ~blocks_to_bake in
let* archive_level = Node.get_level archive_node in
let* () = sync_all_nodes [archive_node; rolling_node] archive_level in
let* export_level = Node.get_level archive_node in
let snapshot_dir = Temp.dir "snapshots_exports" in
let history_mode = Node.Rolling_history in
Log.info "Exporting snapshot at level %d" export_level ;
let* filename =
export_snapshot
rolling_node
~export_level
~snapshot_dir
~history_mode
~export_format:Node.Tar
in
let fresh_node_name =
Format.asprintf
"%a_node_from_%s"
pp_snapshot_history_mode
history_mode
filename
in
let* fresh_node =
Node.init
~name:fresh_node_name
~snapshot:(snapshot_dir // filename, false)
node_arguments
in
(* Baking a few blocks so that the caboose is not the genesis
anymore. *)
let blocks_to_bake =
(preserved_cycles + additional_cycles) * blocks_per_cycle
in
let expected_checkpoint, expected_savepoint, expected_caboose =
(export_level, export_level, max 0 (export_level - max_op_ttl))
in
let* () =
check_consistency_after_import
fresh_node
~expected_head:export_level
~expected_checkpoint
~expected_savepoint
~expected_caboose
in
let* () =
check_blocks_availability
fresh_node
~history_mode
~head:export_level
~savepoint:expected_savepoint
~caboose:expected_caboose
in
let* () = bake_blocks archive_node client ~blocks_to_bake in
let* () = Client.Admin.connect_address ~peer:fresh_node client in
let* expected_head = Node.get_level archive_node in
let* (_ : int) = Node.wait_for_level fresh_node expected_head in
let expected_checkpoint, expected_savepoint, expected_caboose =
match history_mode with
| Node.Full_history -> Test.fail "testing only rolling mode"
| Node.Rolling_history ->
let checkpoint =
expected_head - (preserved_cycles * blocks_per_cycle)
in
let savepoint =
expected_head
- ((preserved_cycles + additional_cycles) * blocks_per_cycle)
in
(checkpoint, savepoint, savepoint - max_op_ttl)
in
let* () =
check_consistency_after_import
fresh_node
~expected_head
~expected_checkpoint
~expected_savepoint
~expected_caboose
in
let* () =
check_blocks_availability
fresh_node
~history_mode
~head:export_level
~savepoint:expected_savepoint
~caboose:expected_caboose
in
unit
(* Checks that the hash, level and version contained in the snapshot
is consistent with regard to the exported data. *)
let test_info_command =
Protocol.register_test
~__FILE__
~title:(Format.asprintf "storage snapshot info command")
~tags:["storage"; "snapshot"; "info"; "command"]
@@ fun protocol ->
let* node = Node.init ~name:"node" node_arguments in
let* client = Client.init ~endpoint:(Node node) () in
let* () = Client.activate_protocol_and_wait ~protocol ~node client in
let blocks_to_bake = 8 in
let* () = bake_blocks node client ~blocks_to_bake in
let* head = RPC.call node @@ RPC.get_chain_block () in
let head_level = JSON.(head |-> "header" |-> "level" |> as_int) in
let snapshot_dir = Temp.dir "snapshots_exports" in
let* filename =
export_snapshot
node
~export_level:head_level
~snapshot_dir
~history_mode:Node.Rolling_history
~export_format:Node.Tar
in
let expected_hash = JSON.(head |-> "hash" |> as_string) in
let expected_level = head_level in
let expected_version = 6 in
Log.info "Checks the human formatted output" ;
(* Get the info line, which is the second line. *)
let* () =
let p = Node.spawn_snapshot_info node (snapshot_dir // filename) in
let* human_output = Process.check_and_read_stdout p in
let human_output = List.nth (String.split_on_char '\n' human_output) 1 in
let re =
rex
"chain \\w+, block hash (\\w+) at level (\\d+),.* \\(snapshot version \
(\\d+)\\)"
in
match human_output =~*** re with
| Some (hash, level, version) ->
Check.(
(hash = expected_hash)
string
~error_msg:"expected block hash %R, got %L" ;
(int_of_string level = expected_level)
int
~error_msg:"expected block level %R, got %L" ;
(int_of_string version = expected_version)
int
~error_msg:"expected version \"%R\", got %L") ;
unit
| None -> Test.fail "Could not parse output %s" human_output
in
Log.info "Checks the JSON formatted output" ;
let* () =
let p =
Node.spawn_snapshot_info ~json:true node (snapshot_dir // filename)
in
let* json_output = Process.check_and_read_stdout p in
let json_output =
JSON.parse ~origin:"node snapshot info --json" json_output
in
let json_snapshot_header = JSON.(json_output |-> "snapshot_header") in
let hash = JSON.(json_snapshot_header |-> "block_hash" |> as_string) in
let level = JSON.(json_snapshot_header |-> "level" |> as_int) in
let version = JSON.(json_snapshot_header |-> "version" |> as_int) in
Check.((hash = expected_hash) string)
~error_msg:"expected block hash %R not found" ;
Check.((level = expected_level) int)
~error_msg:"expected block level %R not found" ;
Check.((version = expected_version) int)
~error_msg:"expected version \"%R\" not found" ;
unit
in
unit
let register ~protocols =
test_export_import_snapshots protocols ;
test_drag_after_rolling_import protocols ;
test_info_command protocols
Computing file changes ...