Revision 5c0277382a82ab3642ddd1de790a362e8c047837 authored by Pierre-Louis on 11 September 2023, 13:01:33 UTC, committed by Pierre-Louis on 25 September 2023, 12:16:22 UTC
1 parent 97eebdf
Raw File
storage_snapshots.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2022 Nomadic Labs <contact@nomadic-labs.com>                *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(* Testing
   -------
   Component: Storage snapshot
   Invocation: dune exec tezt/tests/main.exe -- -f storage_snapshots.ml
   Subject: Tests both the snapshot mechanism and the store's behaviour
*)

let node_arguments = Node.[Synchronisation_threshold 0]

let pp_snapshot_export_format fmt v =
  Format.fprintf fmt "%s" (match v with Node.Tar -> "tar" | Raw -> "raw")

let pp_snapshot_history_mode fmt v =
  Format.fprintf
    fmt
    "%s"
    (match v with
    | Node.Rolling_history -> "rolling"
    | Node.Full_history -> "full")

let get_constants client =
  let* constants =
    RPC.Client.call client @@ RPC.get_chain_block_context_constants ()
  in
  let preserved_cycles = JSON.(constants |-> "preserved_cycles" |> as_int) in
  let blocks_per_cycle = JSON.(constants |-> "blocks_per_cycle" |> as_int) in
  let max_op_ttl =
    JSON.(constants |-> "max_operations_time_to_live" |> as_int)
  in
  return (preserved_cycles, blocks_per_cycle, max_op_ttl)

let export_snapshot node ~export_level ~snapshot_dir ~history_mode
    ~export_format =
  Log.info
    "Exporting %a snapshot for %s at level %d"
    pp_snapshot_history_mode
    history_mode
    (Node.name node)
    export_level ;
  let filename =
    Format.asprintf
      "%s.%a.%a"
      (Node.name node)
      pp_snapshot_history_mode
      history_mode
      pp_snapshot_export_format
      export_format
  in
  let* () =
    Node.snapshot_export
      ~export_level
      ~history_mode
      ~export_format
      node
      (snapshot_dir // filename)
  in
  Log.info
    "Node %s exported %s (level %d, format %s)"
    (Node.name node)
    filename
    export_level
    (Format.asprintf "%a" pp_snapshot_export_format export_format) ;
  return filename

(* Checks that the node's head, checkpoint, savepoint and caboose are
   the expected ones *)
let check_consistency_after_import node ~expected_head ~expected_checkpoint
    ~expected_savepoint ~expected_caboose =
  Log.info "Checking node consistency for %s" (Node.name node) ;
  let* block_head = RPC.call node @@ RPC.get_chain_block () in
  let level = JSON.(block_head |-> "header" |-> "level" |> as_int) in
  let* {level = checkpoint; _} =
    RPC.call node @@ RPC.get_chain_level_checkpoint ()
  in
  let* {level = savepoint; _} =
    RPC.call node @@ RPC.get_chain_level_savepoint ()
  in
  let* {level = caboose; _} = RPC.call node @@ RPC.get_chain_level_caboose () in
  Check.((level = expected_head) int) ~error_msg:"expected level = %R, got %L" ;
  Check.((checkpoint = expected_checkpoint) int)
    ~error_msg:"expected checkpoint = %R, got %L" ;
  Check.((savepoint = expected_savepoint) int)
    ~error_msg:"expected savepoint = %R, got %L" ;
  Check.((caboose = expected_caboose) int)
    ~error_msg:"expected caboose = %R, got %L" ;
  unit

let check_blocks_availability node ~history_mode ~head ~savepoint ~caboose =
  (* The metadata of genesis is available anyway *)
  Log.info "Checking blocks availability for %s" (Node.name node) ;
  let* (_ : RPC.block_metadata) =
    RPC.call node @@ RPC.get_chain_block_metadata ~block:"0" ()
  in
  let iter_block_range_s a b f =
    Lwt_list.iter_s f (range a b |> List.rev |> List.map string_of_int)
  in
  let expect_no_metadata block =
    (* Expects success, as the header must be stored. *)
    let* (_ : JSON.t) = RPC.call node @@ RPC.get_chain_block_header ~block () in
    (* Expects failure, as the metadata must not be stored. *)
    let* {body; code} =
      RPC.call_json node @@ RPC.get_chain_block_metadata ~block ()
    in
    (* In the client, attempting to retrieve missing metadata outputs:
       Command failed: Unable to find block *)
    Check.(
      (code = 500) ~__LOC__ int ~error_msg:"Expected HTTP status %R, got %L.") ;
    let error_id = JSON.(body |=> 0 |-> "id" |> as_string) in
    Check.(
      (error_id = "store.metadata_not_found")
        ~__LOC__
        string
        ~error_msg:"Expected error id %R, got %L") ;
    unit
  in
  let expect_metadata block =
    (* Expects success, as the metadata must be stored. *)
    let* (_ : RPC.block_metadata) =
      RPC.call node @@ RPC.get_chain_block_metadata ~block ()
    in
    unit
  in
  let expect_no_block block =
    let* {code; _} = RPC.call_raw node @@ RPC.get_chain_block ~block () in
    (* In the client, attempting to retrieve an unknown block outputs:
       Did not find service *)
    Check.(
      (code = 404) ~__LOC__ int ~error_msg:"Expected HTTP status %R, got %L.") ;
    unit
  in
  let* () =
    match history_mode with
    | Node.Full_history ->
        iter_block_range_s 1 (savepoint - 1) @@ expect_no_metadata
    | _ ->
        if caboose <> 0 then
          iter_block_range_s 1 (caboose - 1) @@ expect_no_block
        else unit
  in
  let* () = iter_block_range_s savepoint head @@ expect_metadata in
  unit

let sync_all_nodes cluster expected_level =
  let* _ =
    Lwt_list.map_p (fun node -> Node.wait_for_level node expected_level) cluster
  in
  Log.info "All nodes has caught up on level %d!" expected_level ;
  unit

let bake_blocks node client ~blocks_to_bake =
  Log.info "Baking a batch of %d blocks on %s" blocks_to_bake (Node.name node) ;
  repeat blocks_to_bake @@ fun () ->
  Client.bake_for_and_wait
    ~endpoint:(Node node)
    ~node
    ~minimal_timestamp:true
    client

let export_import_and_check node ~export_level ~history_mode ~export_format
    ~max_op_ttl =
  let export_dirs = List.map Temp.dir ["first_export"; "second_export"] in
  let* final_node =
    Lwt_list.fold_left_s
      (fun node snapshot_dir ->
        let* filename =
          export_snapshot
            node
            ~export_level
            ~snapshot_dir
            ~history_mode
            ~export_format
        in
        let fresh_node_name =
          Format.asprintf
            "%a_node_from_%s"
            pp_snapshot_history_mode
            history_mode
            filename
        in
        let* fresh_node =
          Node.init
            ~name:fresh_node_name
            ~snapshot:(snapshot_dir // filename, false)
            node_arguments
        in
        let* () = Node.wait_for_ready fresh_node in
        let expected_checkpoint, expected_savepoint, expected_caboose =
          match history_mode with
          | Node.Full_history -> (export_level, export_level, 0)
          | Node.Rolling_history ->
              (export_level, export_level, max 0 (export_level - max_op_ttl))
        in
        let* () =
          check_consistency_after_import
            fresh_node
            ~expected_head:export_level
            ~expected_checkpoint
            ~expected_savepoint
            ~expected_caboose
        in
        let* () =
          check_blocks_availability
            fresh_node
            ~history_mode
            ~head:export_level
            ~savepoint:expected_savepoint
            ~caboose:expected_caboose
        in
        let* () = Node.terminate fresh_node in
        return fresh_node)
      node
      export_dirs
  in
  let* () = Node.terminate final_node in
  unit

(* This test aims to:
   - start 3 nodes: an archive, a full and a rolling one,
   - bake few blocks using the archive node as a baker,
   - export all kinds of snapshots (in terms of both history mode and
     export format) from each node,
   - import those snapshots and start the fresh nodes accordingly,
   - check the consistency and data availability of those fresh nodes,
   - re-export from the fresh nodes to re-import and re-check the imported data. *)
let test_export_import_snapshots =
  Protocol.register_test
    ~__FILE__
    ~title:"storage snapshot export and import"
    ~tags:["storage"; "snapshot"; "export"; "import"]
  @@ fun protocol ->
  let archive_node =
    Node.create
      ~name:"archive_node"
      (node_arguments @ Node.[History_mode Archive])
  in
  let full_node =
    Node.create
      ~name:"full_node"
      (node_arguments @ Node.[History_mode (Full None)])
  in
  let rolling_node =
    Node.create
      ~name:"rolling_node"
      (node_arguments @ Node.[History_mode (Rolling None)])
  in
  let cluster = [archive_node; full_node; rolling_node] in
  Cluster.clique cluster ;
  let* () = Cluster.start ~public:true cluster in
  let* client = Client.init ~endpoint:(Node archive_node) () in
  let* () = Client.activate_protocol_and_wait ~protocol client in
  let* preserved_cycles, blocks_per_cycle, max_op_ttl = get_constants client in
  (* Bake enough blocks so that the rolling node caboose is not at the
     genesis anymore. To do so, we need to bake at least 3 cycles,
     after activating the protocol, i.e 3*8 = 24 blocks. *)
  let blocks_to_bake = (preserved_cycles + 1) * blocks_per_cycle in
  let* () = bake_blocks archive_node client ~blocks_to_bake in
  let* archive_level = Node.get_level archive_node in
  let* () = sync_all_nodes cluster archive_level in
  (* Terminate all nodes to save resources. Note: we may consider
     that exporting a snapshot from a node that is running is an
     actual usecase (such exports are already done in other tests). *)
  let* () = Lwt_list.iter_p Node.terminate cluster in
  let export_import_and_check =
    export_import_and_check ~max_op_ttl ~export_level:archive_level
  in
  let* () =
    Lwt_list.iter_s
      (fun export_format ->
        Lwt_list.iter_s
          (fun (history_mode, nodes) ->
            Lwt_list.iter_s
              (fun node ->
                export_import_and_check node ~history_mode ~export_format)
              nodes)
          [
            (Node.Full_history, [archive_node; full_node]);
            (Node.Rolling_history, [archive_node; full_node; rolling_node]);
          ])
      Node.[Tar; Raw]
  in
  unit

(* This test aims to export and import a rolling snapshot, bake some
   blocks and make sure that the checkpoint, savepoint and caboose are
   well dragged. *)
let test_drag_after_rolling_import =
  Protocol.register_test
    ~__FILE__
    ~title:"storage snapshot drag after rolling import"
    ~tags:["storage"; "snapshot"; "drag"; "rolling"; "import"]
  @@ fun protocol ->
  let archive_node =
    Node.create
      ~name:"archive_node"
      (node_arguments @ Node.[History_mode Archive])
  in
  (* The number of additional cycles is expected to reflect the
     default value from
     Shell_services.History_modes.default_additional_cycles. *)
  let additional_cycles = 1 in
  let rolling_node =
    Node.create
      ~name:"rolling_node"
      (node_arguments @ Node.[History_mode (Rolling (Some additional_cycles))])
  in
  let cluster = [archive_node; rolling_node] in
  Cluster.clique cluster ;
  let* () = Cluster.start ~public:true cluster in
  let* client = Client.init ~endpoint:(Node archive_node) () in
  let* () = Client.activate_protocol_and_wait ~protocol client in
  let* preserved_cycles, blocks_per_cycle, max_op_ttl = get_constants client in
  Log.info "Baking a few blocks"
  (* Baking enough blocks so that the caboose is not the genesis
     anymore (depending on the max_op_ttl)*) ;
  let blocks_to_bake = (1 * blocks_per_cycle) + max_op_ttl in
  let* () = bake_blocks archive_node client ~blocks_to_bake in
  let* archive_level = Node.get_level archive_node in
  let* () = sync_all_nodes [archive_node; rolling_node] archive_level in
  let* export_level = Node.get_level archive_node in
  let snapshot_dir = Temp.dir "snapshots_exports" in
  let history_mode = Node.Rolling_history in
  Log.info "Exporting snapshot at level %d" export_level ;
  let* filename =
    export_snapshot
      rolling_node
      ~export_level
      ~snapshot_dir
      ~history_mode
      ~export_format:Node.Tar
  in
  let fresh_node_name =
    Format.asprintf
      "%a_node_from_%s"
      pp_snapshot_history_mode
      history_mode
      filename
  in
  let* fresh_node =
    Node.init
      ~name:fresh_node_name
      ~snapshot:(snapshot_dir // filename, false)
      node_arguments
  in
  (* Baking a few blocks so that the caboose is not the genesis
     anymore. *)
  let blocks_to_bake =
    (preserved_cycles + additional_cycles) * blocks_per_cycle
  in
  let expected_checkpoint, expected_savepoint, expected_caboose =
    (export_level, export_level, max 0 (export_level - max_op_ttl))
  in
  let* () =
    check_consistency_after_import
      fresh_node
      ~expected_head:export_level
      ~expected_checkpoint
      ~expected_savepoint
      ~expected_caboose
  in
  let* () =
    check_blocks_availability
      fresh_node
      ~history_mode
      ~head:export_level
      ~savepoint:expected_savepoint
      ~caboose:expected_caboose
  in
  let* () = bake_blocks archive_node client ~blocks_to_bake in
  let* () = Client.Admin.connect_address ~peer:fresh_node client in
  let* expected_head = Node.get_level archive_node in
  let* (_ : int) = Node.wait_for_level fresh_node expected_head in
  let expected_checkpoint, expected_savepoint, expected_caboose =
    match history_mode with
    | Node.Full_history -> Test.fail "testing only rolling mode"
    | Node.Rolling_history ->
        let checkpoint =
          expected_head - (preserved_cycles * blocks_per_cycle)
        in
        let savepoint =
          expected_head
          - ((preserved_cycles + additional_cycles) * blocks_per_cycle)
        in
        (checkpoint, savepoint, savepoint - max_op_ttl)
  in
  let* () =
    check_consistency_after_import
      fresh_node
      ~expected_head
      ~expected_checkpoint
      ~expected_savepoint
      ~expected_caboose
  in
  let* () =
    check_blocks_availability
      fresh_node
      ~history_mode
      ~head:export_level
      ~savepoint:expected_savepoint
      ~caboose:expected_caboose
  in
  unit

(* Checks that the hash, level and version contained in the snapshot
   is consistent with regard to the exported data. *)
let test_info_command =
  Protocol.register_test
    ~__FILE__
    ~title:(Format.asprintf "storage snapshot info command")
    ~tags:["storage"; "snapshot"; "info"; "command"]
  @@ fun protocol ->
  let* node = Node.init ~name:"node" node_arguments in
  let* client = Client.init ~endpoint:(Node node) () in
  let* () = Client.activate_protocol_and_wait ~protocol ~node client in
  let blocks_to_bake = 8 in
  let* () = bake_blocks node client ~blocks_to_bake in
  let* head = RPC.call node @@ RPC.get_chain_block () in
  let head_level = JSON.(head |-> "header" |-> "level" |> as_int) in
  let snapshot_dir = Temp.dir "snapshots_exports" in
  let* filename =
    export_snapshot
      node
      ~export_level:head_level
      ~snapshot_dir
      ~history_mode:Node.Rolling_history
      ~export_format:Node.Tar
  in
  let expected_hash = JSON.(head |-> "hash" |> as_string) in
  let expected_level = head_level in
  let expected_version = 6 in
  Log.info "Checks the human formatted output" ;
  (* Get the info line, which is the second line. *)
  let* () =
    let p = Node.spawn_snapshot_info node (snapshot_dir // filename) in
    let* human_output = Process.check_and_read_stdout p in
    let human_output = List.nth (String.split_on_char '\n' human_output) 1 in
    let re =
      rex
        "chain \\w+, block hash (\\w+) at level (\\d+),.* \\(snapshot version \
         (\\d+)\\)"
    in
    match human_output =~*** re with
    | Some (hash, level, version) ->
        Check.(
          (hash = expected_hash)
            string
            ~error_msg:"expected block hash %R, got %L" ;
          (int_of_string level = expected_level)
            int
            ~error_msg:"expected block level %R, got %L" ;
          (int_of_string version = expected_version)
            int
            ~error_msg:"expected version \"%R\", got %L") ;
        unit
    | None -> Test.fail "Could not parse output %s" human_output
  in
  Log.info "Checks the JSON formatted output" ;
  let* () =
    let p =
      Node.spawn_snapshot_info ~json:true node (snapshot_dir // filename)
    in
    let* json_output = Process.check_and_read_stdout p in
    let json_output =
      JSON.parse ~origin:"node snapshot info --json" json_output
    in
    let json_snapshot_header = JSON.(json_output |-> "snapshot_header") in
    let hash = JSON.(json_snapshot_header |-> "block_hash" |> as_string) in
    let level = JSON.(json_snapshot_header |-> "level" |> as_int) in
    let version = JSON.(json_snapshot_header |-> "version" |> as_int) in
    Check.((hash = expected_hash) string)
      ~error_msg:"expected block hash %R not found" ;
    Check.((level = expected_level) int)
      ~error_msg:"expected block level %R not found" ;
    Check.((version = expected_version) int)
      ~error_msg:"expected version \"%R\" not found" ;
    unit
  in
  unit

let register ~protocols =
  test_export_import_snapshots protocols ;
  test_drag_after_rolling_import protocols ;
  test_info_command protocols
back to top