client_proto_stresstest_commands.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2021 Nomadic Labs <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
open Protocol
open Alpha_context
type transfer_strategy =
| Fixed_amount of {mutez : Tez.t} (** Amount to transfer *)
| Evaporation of {fraction : float}
(** Maximum fraction of current wealth to transfer.
Minimum amount is 1 mutez regardless of total wealth. *)
type limit =
| Abs of int (** Absolute level at which we should stop *)
| Rel of int (** Relative number of level before stopping *)
type parameters = {
seed : int;
fresh_probability : float;
(** Per-transfer probability that the destination will be fresh *)
tps : float; (** Transaction per seconds target *)
strategy : transfer_strategy;
fee_mutez : Tez.t; (** fees for each transfer, in mutez *)
gas_limit : Gas.Arith.integral; (** gas limit per operation *)
storage_limit : Z.t; (** storage limit per operation *)
account_creation_storage : Z.t;
(** upper bound on bytes consumed when creating a tz1 account *)
total_transfers : int option;
(** total number of transfers to perform; unbounded if None *)
single_op_per_pkh_per_block : bool;
(** if true, a single operation will be injected by pkh by block to
improve the chance for the injected operations to be included in the
next block *)
level_limit : limit option;
(** total number of levels during which the stresstest is run;
unbounded if None *)
}
type origin = Explicit | Wallet_pkh | Wallet_alias of string
type source = {
pkh : public_key_hash;
pk : public_key;
sk : Tezos_crypto.Signature.V0.secret_key;
}
type input_source =
| Explicit of source
| Wallet_alias of string
| Wallet_pkh of public_key_hash
type source_origin = {source : source; origin : origin}
type transfer = {
src : source;
dst : public_key_hash;
fee : Tez.t;
amount : Tez.t;
counter : Z.t option;
fresh_dst : bool;
}
type state = {
current_head_on_start : Block_hash.t;
counters :
(Block_hash.t * Z.t) Tezos_crypto.Signature.V0.Public_key_hash.Table.t;
mutable pool : source_origin list;
mutable pool_size : int;
(** [Some l] if [single_op_per_pkh_per_block] is true *)
mutable shuffled_pool : source list option;
mutable revealed : Tezos_crypto.Signature.V0.Public_key_hash.Set.t;
mutable last_block : Block_hash.t;
mutable last_level : int;
new_block_condition : unit Lwt_condition.t;
injected_operations : Operation_hash.t list Block_hash.Table.t;
}
let verbose = ref false
let debug = ref false
let debug_msg msg = if !debug then msg () else Lwt.return_unit
let default_parameters =
{
seed = 0x533D;
fresh_probability = 0.001;
tps = 5.0;
strategy = Fixed_amount {mutez = Tez.one};
fee_mutez = Tez.of_mutez_exn 2_000L;
gas_limit = Gas.Arith.integral_of_int_exn 1_600;
(* [gas_limit] corresponds to a slight overapproximation of the
gas needed to inject an operation. It was obtained by simulating
the operation using the client. *)
storage_limit = Z.zero;
account_creation_storage = Z.of_int 300;
(* [account_creation_storage] corresponds to a slight overapproximation
of the storage consumed when allocating a new implicit account.
It was obtained by simulating the operation using the client. *)
total_transfers = None;
single_op_per_pkh_per_block = false;
level_limit = None;
}
let input_source_encoding =
let open Data_encoding in
union
[
case
~title:"explicit"
(Tag 0)
(obj3
(req "pkh" Tezos_crypto.Signature.V0.Public_key_hash.encoding)
(req "pk" Tezos_crypto.Signature.V0.Public_key.encoding)
(req "sk" Tezos_crypto.Signature.V0.Secret_key.encoding))
(function Explicit {pkh; pk; sk} -> Some (pkh, pk, sk) | _ -> None)
(fun (pkh, pk, sk) -> Explicit {pkh; pk; sk});
case
~title:"alias"
(Tag 1)
(obj1 (req "alias" Data_encoding.string))
(function Wallet_alias alias -> Some alias | _ -> None)
(fun alias -> Wallet_alias alias);
case
~title:"pkh"
(Tag 2)
(obj1 (req "pkh" Tezos_crypto.Signature.V0.Public_key_hash.encoding))
(function Wallet_pkh pkh -> Some pkh | _ -> None)
(fun pkh -> Wallet_pkh pkh);
]
let input_source_list_encoding = Data_encoding.list input_source_encoding
let injected_operations_encoding =
let open Data_encoding in
list
(obj2
(req "block_hash_when_injected" Block_hash.encoding)
(req "operation_hashes" (list Operation_hash.encoding)))
let parse_strategy s =
match String.split ~limit:1 ':' s with
| ["fixed"; parameter] -> (
match int_of_string parameter with
| exception _ -> Error "invalid integer literal"
| mutez when mutez <= 0 -> Error "negative amount"
| mutez -> (
match Tez.of_mutez (Int64.of_int mutez) with
| None -> Error "invalid mutez"
| Some mutez -> Ok (Fixed_amount {mutez})))
| ["evaporation"; parameter] -> (
match float_of_string parameter with
| exception _ -> Error "invalid float literal"
| fraction when fraction < 0.0 || fraction > 1.0 ->
Error "invalid evaporation rate"
| fraction -> Ok (Evaporation {fraction}))
| _ -> Error "invalid argument"
(** This command uses two different data structures for sources:
- The in-output files one,
- The normalized one.
The data structure used for in-output files does not directly contain the
data required to forge operations. For efficiency purposes, the sources are
converted into a normalized data structure that contains all the required
data to forge operations and the format originally used to be able to
revert this conversion. *)
(** [normalize_source cctxt src] converts [src] from in-output data structure
to normalized one. If the conversion fails, [None] is returned and a
warning message is printed in [cctxt].
Only unencrypted and encrypted sources from the wallet of [cctxt] are
supported. *)
let normalize_source cctxt =
let sk_of_sk_uri sk_uri =
match
Tezos_crypto.Signature.V0.Secret_key.of_b58check
(Uri.path (sk_uri : Client_keys_v0.sk_uri :> Uri.t))
with
| Ok sk -> Lwt.return_some sk
| Error _ -> (
Tezos_signer_backends.Encrypted.decrypt cctxt sk_uri >|= function
| Error _ -> None
| Ok sk -> Tezos_crypto.Signature.V0.Of_V_latest.secret_key sk)
in
let key_from_alias alias =
let warning msg alias =
cctxt#warning msg alias >>= fun () -> Lwt.return_none
in
(Client_keys_v0.alias_keys cctxt alias >>= function
| Error _ | Ok None -> warning "Alias \"%s\" not found in the wallet" alias
| Ok (Some (_, None, _)) | Ok (Some (_, _, None)) ->
warning
"Alias \"%s\" does not contain public or secret key and could not \
be used for stresstest"
alias
| Ok (Some (pkh, Some pk, Some sk_uri)) -> (
sk_of_sk_uri sk_uri >>= function
| None ->
warning
"Cannot extract the secret key form the alias \"%s\" of the \
wallet"
alias
| Some sk ->
Lwt.return_some
{source = {pkh; pk; sk}; origin = Wallet_alias alias}))
>>= function
| None -> warning "Source given as alias \"%s\" ignored" alias
| key -> Lwt.return key
in
let key_from_wallet pkh =
let warning msg pkh =
cctxt#warning msg Tezos_crypto.Signature.V0.Public_key_hash.pp pkh
>>= fun () -> Lwt.return_none
in
(Client_keys_v0.get_key cctxt pkh >>= function
| Error _ -> warning "Pkh \"%a\" not found in the wallet" pkh
| Ok (alias, pk, sk_uri) -> (
sk_of_sk_uri sk_uri >>= function
| None ->
cctxt#warning
"Cannot extract the secret key form the pkh \"%a\" (alias: \
\"%s\") of the wallet"
Tezos_crypto.Signature.V0.Public_key_hash.pp
pkh
alias
>>= fun () -> Lwt.return_none
| Some sk ->
Lwt.return_some {source = {pkh; pk; sk}; origin = Wallet_pkh}))
>>= function
| None -> warning "Source given as pkh \"%a\" ignored" pkh
| key -> Lwt.return key
in
function
| Explicit source -> Lwt.return_some {source; origin = Explicit}
| Wallet_alias alias -> key_from_alias alias
| Wallet_pkh pkh -> key_from_wallet pkh
(** [unnormalize_source src_org] converts [src_org] from normalized data
structure to in-output one. *)
let unnormalize_source src_org =
match src_org.origin with
| Explicit -> Explicit src_org.source
| Wallet_pkh -> Wallet_pkh src_org.source.pkh
| Wallet_alias alias -> Wallet_alias alias
(** Samples from [state.pool]. Used to generate the destination of a
transfer, and its source only when [state.shuffled_pool] is [None]
meaning that [--single-op-per-pkh-per-block] is not set. *)
let sample_any_source_from_pool state rng =
let idx = Random.State.int rng state.pool_size in
match List.nth state.pool idx with
| None -> assert false
| Some src_org -> Lwt.return src_org.source
(** Generates the source of a transfer. If [state.shuffled_pool] has a
value (meaning that [--single-op-per-pkh-per-block] is active) then
it is sampled from there, otherwise from [state.pool]. *)
let rec sample_source_from_pool state rng (cctxt : Protocol_client_context.full)
=
match state.shuffled_pool with
| None -> sample_any_source_from_pool state rng
| Some (source :: l) ->
state.shuffled_pool <- Some l ;
debug_msg (fun () ->
cctxt#message
"sample_transfer: %d unused sources for the block next to %a"
(List.length l)
Block_hash.pp
state.last_block)
>>= fun () -> Lwt.return source
| Some [] ->
cctxt#message
"all available sources have been used for block next to %a"
Block_hash.pp
state.last_block
>>= fun () ->
Lwt_condition.wait state.new_block_condition >>= fun () ->
sample_source_from_pool state rng cctxt
let random_seed rng =
Bytes.init 32 (fun _ -> Char.chr (Random.State.int rng 256))
let generate_fresh_source pool rng =
let seed = random_seed rng in
let pkh, pk, sk = Tezos_crypto.Signature.V0.generate_key ~seed () in
let fresh = {source = {pkh; pk; sk}; origin = Explicit} in
pool.pool <- fresh :: pool.pool ;
pool.pool_size <- pool.pool_size + 1 ;
fresh.source
(* [heads_iter cctxt f] calls [f head] each time there is a new head
received by the streamed RPC /monitor/heads/main *)
let heads_iter (cctxt : Protocol_client_context.full)
(f : Block_hash.t * Tezos_base.Block_header.t -> unit Lwt.t) :
unit tzresult Lwt.t =
let open Lwt_result_syntax in
Error_monad.protect
(fun () ->
let* heads_stream, stopper = Shell_services.Monitor.heads cctxt `Main in
let rec loop () : unit tzresult Lwt.t =
let*! block_hash_and_header = Lwt_stream.get heads_stream in
match block_hash_and_header with
| None -> Error_monad.failwith "unexpected end of block stream@."
| Some ((new_block_hash, _block_header) as block_hash_and_header) ->
Lwt.catch
(fun () ->
let*! () =
debug_msg (fun () ->
cctxt#message
"heads_iter: new block received %a@."
Block_hash.pp
new_block_hash)
in
let* protocols =
Shell_services.Blocks.protocols
cctxt
~block:(`Hash (new_block_hash, 0))
()
in
if Protocol_hash.(protocols.current_protocol = Protocol.hash)
then
let*! () = f block_hash_and_header in
loop ()
else
let*! () =
debug_msg (fun () ->
cctxt#message
"heads_iter: new block on protocol %a. Stopping \
iteration.@."
Protocol_hash.pp
protocols.current_protocol)
in
return_unit)
(fun exn ->
Error_monad.failwith
"An exception occured on a function bound on new heads : %s@."
(Printexc.to_string exn))
in
let* () = loop () in
stopper () ;
let*! () =
debug_msg (fun () ->
cctxt#message
"head iteration for proto %a stopped@."
Protocol_hash.pp
Protocol.hash)
in
return_unit)
~on_error:(fun trace ->
cctxt#error
"An error while monitoring the new heads for proto %a occured: %a@."
Protocol_hash.pp
Protocol.hash
Error_monad.pp_print_trace
trace)
(* We perform rejection sampling of valid sources.
We could maintain a local cache of existing contracts with sufficient balance. *)
let rec sample_transfer (cctxt : Protocol_client_context.full) chain block
(parameters : parameters) (state : state) rng =
sample_source_from_pool state rng cctxt >>= fun src ->
Alpha_services.Contract.balance
cctxt
(chain, block)
(Contract.implicit_contract src.pkh)
>>=? fun tez ->
if Tez.(tez = zero) then
debug_msg (fun () ->
cctxt#message
"sample_transfer: invalid balance %a"
Tezos_crypto.Signature.V0.Public_key_hash.pp
src.pkh)
>>= fun () ->
(* Sampled source has zero balance: the transfer that created that
address was not included yet. Retry *)
sample_transfer cctxt chain block parameters state rng
else
let fresh = Random.State.float rng 1.0 < parameters.fresh_probability in
(if fresh then Lwt.return (generate_fresh_source state rng)
else sample_any_source_from_pool state rng)
>>= fun dest ->
let amount =
match parameters.strategy with
| Fixed_amount {mutez} -> mutez
| Evaporation {fraction} ->
let mutez = Int64.to_float (Tez.to_mutez tez) in
let max_fraction = Int64.of_float (mutez *. fraction) in
let amount =
if max_fraction = 0L then 1L
else max 1L (Random.State.int64 rng max_fraction)
in
Tez.of_mutez_exn amount
in
let fee = parameters.fee_mutez in
return {src; dst = dest.pkh; fee; amount; counter = None; fresh_dst = fresh}
let inject_contents (cctxt : Protocol_client_context.full) chain branch sk
contents =
let bytes =
Data_encoding.Binary.to_bytes_exn
Operation.unsigned_encoding
({branch}, Contents_list contents)
in
let signature =
Some
(Tezos_crypto.Signature.V0.sign
~watermark:Tezos_crypto.Signature.V0.Generic_operation
sk
bytes)
in
let op : _ Operation.t =
{shell = {branch}; protocol_data = {contents; signature}}
in
let bytes =
Data_encoding.Binary.to_bytes_exn Operation.encoding (Operation.pack op)
in
Shell_services.Injection.operation cctxt ~chain bytes
(* counter _must_ be set before calling this function *)
let manager_op_of_transfer parameters
{src; dst; fee; amount; counter; fresh_dst} =
let source = src.pkh in
let gas_limit = parameters.gas_limit in
let storage_limit =
if fresh_dst then
Z.add parameters.account_creation_storage parameters.storage_limit
else parameters.storage_limit
in
let operation =
let parameters =
let open Tezos_micheline in
Script.lazy_expr
@@ Micheline.strip_locations
(Prim (0, Michelson_v1_primitives.D_Unit, [], []))
in
let entrypoint = "default" in
let destination = Contract.implicit_contract dst in
Transaction {amount; parameters; entrypoint; destination}
in
match counter with
| None -> assert false
| Some counter ->
Manager_operation
{source; fee; counter; operation; gas_limit; storage_limit}
let cost_of_manager_operation = Gas.Arith.integral_of_int_exn 1_000
let inject_transfer (cctxt : Protocol_client_context.full) parameters state rng
chain block transfer =
Alpha_services.Contract.counter cctxt (chain, block) transfer.src.pkh
>>=? fun pcounter ->
Shell_services.Blocks.hash cctxt ~chain ~block () >>=? fun branch ->
(* If there is a new block refresh the fresh_pool *)
if not (Block_hash.equal branch state.last_block) then (
state.last_block <- branch ;
if Option.is_some state.shuffled_pool then
state.shuffled_pool <-
Some
(List.shuffle
~rng
(List.map (fun src_org -> src_org.source) state.pool))) ;
let freshest_counter =
match
Tezos_crypto.Signature.V0.Public_key_hash.Table.find
state.counters
transfer.src.pkh
with
| None ->
(* This is the first operation we inject for this pkh: the counter given
by the RPC _must_ be the freshest one. *)
pcounter
| Some (previous_branch, previous_counter) ->
if Block_hash.equal branch previous_branch then
(* We already injected an operation on top of this block: the one stored
locally is the freshest one. *)
previous_counter
else
(* It seems the block changed since we last injected an operation:
this invalidates the previously stored counter. We return the counter
given by the RPC. *)
pcounter
in
(if
Tezos_crypto.Signature.V0.Public_key_hash.Set.mem
transfer.src.pkh
state.revealed
then return true
else (
(* Either the [manager_key] RPC tells us the key is already
revealed, or we immediately inject a reveal operation: in any
case the key is revealed in the end. *)
state.revealed <-
Tezos_crypto.Signature.V0.Public_key_hash.Set.add
transfer.src.pkh
state.revealed ;
Alpha_services.Contract.manager_key cctxt (chain, block) transfer.src.pkh
>>=? fun pk_opt -> return (Option.is_some pk_opt)))
>>=? fun already_revealed ->
(if not already_revealed then (
let reveal_counter = Z.succ freshest_counter in
let transf_counter = Z.succ reveal_counter in
let reveal =
Manager_operation
{
source = transfer.src.pkh;
fee = Tez.zero;
counter = reveal_counter;
gas_limit = cost_of_manager_operation;
storage_limit = Z.zero;
operation = Reveal transfer.src.pk;
}
in
let manager_op =
manager_op_of_transfer
parameters
{transfer with counter = Some transf_counter}
in
let list = Cons (reveal, Single manager_op) in
Tezos_crypto.Signature.V0.Public_key_hash.Table.remove
state.counters
transfer.src.pkh ;
Tezos_crypto.Signature.V0.Public_key_hash.Table.add
state.counters
transfer.src.pkh
(branch, transf_counter) ;
(if !verbose then
cctxt#message
"injecting reveal+transfer from %a (counters=%a,%a) to %a"
Tezos_crypto.Signature.V0.Public_key_hash.pp
transfer.src.pkh
Z.pp_print
reveal_counter
Z.pp_print
transf_counter
Tezos_crypto.Signature.V0.Public_key_hash.pp
transfer.dst
else Lwt.return_unit)
>>= fun () ->
(* NB: regardless of our best efforts to keep track of counters, injection can fail with
"counter in the future" if a block switch happens in between the moment we
get the branch and the moment we inject, and the new block does not include
all the operations we injected. *)
inject_contents cctxt chain branch transfer.src.sk list)
else
let transf_counter = Z.succ freshest_counter in
let manager_op =
manager_op_of_transfer
parameters
{transfer with counter = Some transf_counter}
in
let list = Single manager_op in
Tezos_crypto.Signature.V0.Public_key_hash.Table.remove
state.counters
transfer.src.pkh ;
Tezos_crypto.Signature.V0.Public_key_hash.Table.add
state.counters
transfer.src.pkh
(branch, transf_counter) ;
(if !verbose then
cctxt#message
"injecting transfer from %a (counter=%a) to %a"
Tezos_crypto.Signature.V0.Public_key_hash.pp
transfer.src.pkh
Z.pp_print
transf_counter
Tezos_crypto.Signature.V0.Public_key_hash.pp
transfer.dst
else Lwt.return_unit)
>>= fun () ->
(* See comment above. *)
inject_contents cctxt chain branch transfer.src.sk list)
>>= function
| Ok op_hash ->
debug_msg (fun () ->
cctxt#message
"inject_transfer: op injected %a"
Operation_hash.pp
op_hash)
>>= fun () ->
let ops =
Option.value
~default:[]
(Block_hash.Table.find state.injected_operations branch)
in
Block_hash.Table.replace state.injected_operations branch (op_hash :: ops) ;
return_unit
| Error e ->
debug_msg (fun () ->
cctxt#message
"inject_transfer: error, op not injected: %a"
Error_monad.pp_print_trace
e)
>>= fun () -> return_unit
let save_injected_operations (cctxt : Protocol_client_context.full) state =
let json =
Data_encoding.Json.construct
injected_operations_encoding
(Block_hash.Table.fold
(fun k v acc -> (k, v) :: acc)
state.injected_operations
[])
in
let path =
Filename.temp_file "client-stresstest-injected_operations-" ".json"
in
cctxt#message "writing injected operations in file %s" path >>= fun () ->
Lwt_utils_unix.Json.write_file path json >>= function
| Error e ->
cctxt#message
"could not write injected operations json file: %a"
Error_monad.pp_print_trace
e
| Ok _ -> Lwt.return_unit
let stat_on_exit (cctxt : Protocol_client_context.full) state =
let ratio_injected_included_op () =
Shell_services.Blocks.hash cctxt () >>=? fun current_head_on_exit ->
let inter_cardinal s1 s2 =
Operation_hash.Set.cardinal
(Operation_hash.Set.inter
(Operation_hash.Set.of_list s1)
(Operation_hash.Set.of_list s2))
in
let get_included_ops older_block =
let rec get_included_ops block acc_included_ops =
if block = older_block then return acc_included_ops
else
Shell_services.Chain.Blocks.Operation_hashes.operation_hashes_in_pass
cctxt
~chain:`Main
~block:(`Hash (block, 0))
3
>>=? fun included_ops ->
Shell_services.Blocks.list
cctxt
~chain:`Main
~heads:[block]
~length:2
()
>>=? function
| [[current; predecessor]] when current = block ->
get_included_ops
predecessor
(List.append acc_included_ops included_ops)
| _ -> cctxt#error "Error while computing stats: invalid block list"
in
get_included_ops current_head_on_exit []
in
let injected_ops =
Block_hash.Table.fold
(fun k l acc ->
(* The operations injected during the last block are ignored because
they should not be currently included. *)
if current_head_on_exit <> k then List.append acc l else acc)
state.injected_operations
[]
in
get_included_ops state.current_head_on_start >>=? fun included_ops ->
let included_ops_count = inter_cardinal injected_ops included_ops in
debug_msg (fun () ->
cctxt#message
"injected : %a\nincluded: %a"
(Format.pp_print_list Operation_hash.pp)
injected_ops
(Format.pp_print_list Operation_hash.pp)
included_ops)
>>= fun () ->
let injected_ops_count = List.length injected_ops in
cctxt#message
"%s of the injected operations have been included (%d injected, %d \
included). Note that the operations injected during the last block are \
ignored because they should not be currently included."
(if Int.equal injected_ops_count 0 then "N/A"
else Format.sprintf "%d%%" (included_ops_count * 100 / injected_ops_count))
injected_ops_count
included_ops_count
>>= fun () -> return_unit
in
ratio_injected_included_op ()
let launch (cctxt : Protocol_client_context.full) (parameters : parameters)
state rng save_pool_callback =
let injected = ref 0 in
let target_level =
match parameters.level_limit with
| None -> None
| Some (Abs target) -> Some target
| Some (Rel offset) -> Some (state.last_level + offset)
in
let dt = 1. /. parameters.tps in
let terminated () =
if
match parameters.total_transfers with
| None -> false
| Some bound -> bound <= !injected
then
cctxt#message
"Stopping after %d injections (target %a)."
!injected
Format.(pp_print_option pp_print_int)
parameters.total_transfers
>>= fun () -> Lwt.return_true
else
match target_level with
| None -> Lwt.return_false
| Some target ->
if target <= state.last_level then
cctxt#message
"Stopping at level %d (target level: %d)."
state.last_level
target
>>= fun () -> Lwt.return_true
else Lwt.return_false
in
let rec loop () =
terminated () >>= fun terminated ->
if terminated then
save_pool_callback () >>= fun () ->
save_injected_operations cctxt state >>= fun () ->
stat_on_exit cctxt state
else
let start = Mtime_clock.elapsed () in
debug_msg (fun () -> cctxt#message "launch.loop: invoke sample_transfer")
>>= fun () ->
sample_transfer cctxt cctxt#chain cctxt#block parameters state rng
>>=? fun transfer ->
debug_msg (fun () -> cctxt#message "launch.loop: invoke inject_transfer")
>>= fun () ->
inject_transfer
cctxt
parameters
state
rng
cctxt#chain
cctxt#block
transfer
>>=? fun () ->
incr injected ;
let stop = Mtime_clock.elapsed () in
let elapsed = Mtime.Span.(to_s stop -. to_s start) in
let remaining = dt -. elapsed in
(if remaining <= 0.0 then
cctxt#warning
"warning: tps target could not be reached, consider using a lower \
value for --tps"
else Lwt_unix.sleep remaining)
>>= loop
in
let on_new_head : Block_hash.t * Tezos_base.Block_header.t -> unit Lwt.t =
match state.shuffled_pool with
(* Some _ if and only if [single_op_per_pkh_per_block] is true. *)
| Some _ ->
fun (new_block_hash, new_block_header) ->
if not (Block_hash.equal new_block_hash state.last_block) then (
state.last_block <- new_block_hash ;
state.last_level <- Int32.to_int new_block_header.shell.level ;
state.shuffled_pool <-
Some
(List.shuffle
~rng
(List.map (fun src_org -> src_org.source) state.pool))) ;
Lwt_condition.broadcast state.new_block_condition () ;
Lwt.return_unit
| None ->
(* only wait for the end of the head stream; don't act on heads *)
fun _ -> Lwt.return_unit
in
let heads_iteration = heads_iter cctxt on_new_head in
(* The head iteration stops at protocol change. *)
Lwt.pick [loop (); heads_iteration]
let group =
Tezos_clic.
{name = "stresstest"; title = "Commands for stress-testing the network"}
type pool_source =
| From_string of {json : Ezjsonm.value}
| From_file of {path : string; json : Ezjsonm.value}
let json_of_pool_source = function
| From_string {json} | From_file {json; _} -> json
let json_file_or_text_parameter =
Tezos_clic.parameter (fun _ p ->
match String.split ~limit:1 ':' p with
| ["text"; text] -> return (From_string {json = Ezjsonm.from_string text})
| ["file"; path] ->
Lwt_utils_unix.Json.read_file path >|=? fun json ->
From_file {path; json}
| _ -> (
if Sys.file_exists p then
Lwt_utils_unix.Json.read_file p >|=? fun json ->
From_file {path = p; json}
else
try return (From_string {json = Ezjsonm.from_string p})
with Ezjsonm.Parse_error _ ->
failwith "Neither an existing file nor valid JSON: '%s'" p))
let seed_arg =
let open Tezos_clic in
arg
~long:"seed"
~placeholder:"int"
~doc:"random seed"
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match int_of_string s with
| exception _ ->
cctxt#error
"While parsing --seed: could not convert argument to int"
| i -> return i))
let tps_arg =
let open Tezos_clic in
arg
~long:"tps"
~placeholder:"float"
~doc:"transactions per seconds target"
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match float_of_string s with
| exception _ ->
cctxt#error
"While parsing --tps: could not convert argument to float"
| f when f < 0.0 ->
cctxt#error "While parsing --tps: negative argument"
| f -> return f))
let fresh_probability_arg =
let open Tezos_clic in
arg
~long:"fresh-probability"
~placeholder:"float in [0;1]"
~doc:
(Format.sprintf
"Probability for each transaction's destination to be a fresh \
account. The default value is %g. This new account may then be used \
as source or destination of subsequent transactions, just like the \
accounts that were initially provided to the command. Note that when \
[--single-op-per-pkh-per-block] is set, the new account will not be \
used as source until the head changes."
default_parameters.fresh_probability)
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match float_of_string s with
| exception _ ->
cctxt#error
"While parsing --fresh-probability: could not convert argument \
to float"
| f when f < 0.0 || f > 1.0 ->
cctxt#error "While parsing --fresh-probability: invalid argument"
| f -> return f))
let strategy_arg =
let open Tezos_clic in
arg
~long:"strategy"
~placeholder:"fixed:mutez | evaporation:[0;1]"
~doc:"wealth redistribution strategy"
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match parse_strategy s with
| Error msg -> cctxt#error "While parsing --strategy: %s" msg
| Ok strategy -> return strategy))
let gas_limit_arg =
let open Tezos_clic in
let gas_limit_kind =
parameter (fun _ s ->
try
let v = Z.of_string s in
return (Gas.Arith.integral_exn v)
with _ -> failwith "invalid gas limit (must be a positive number)")
in
arg
~long:"gas-limit"
~short:'G'
~placeholder:"amount"
~doc:
(Format.asprintf
"Set the gas limit of the transaction instead of using the default \
value of %a"
Gas.Arith.pp_integral
default_parameters.gas_limit)
gas_limit_kind
let storage_limit_arg =
let open Tezos_clic in
let storage_limit_kind =
parameter (fun _ s ->
try
let v = Z.of_string s in
assert (Compare.Z.(v >= Z.zero)) ;
return v
with _ ->
failwith "invalid storage limit (must be a positive number of bytes)")
in
arg
~long:"storage-limit"
~short:'S'
~placeholder:"amount"
~doc:
(Format.asprintf
"Set the storage limit of the transaction instead of using the \
default value of %a"
Z.pp_print
default_parameters.storage_limit)
storage_limit_kind
let transfers_arg =
let open Tezos_clic in
arg
~long:"transfers"
~placeholder:"integer"
~doc:"total number of transfers to perform, unbounded if not specified"
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match int_of_string s with
| exception _ ->
cctxt#error "While parsing --transfers: invalid integer literal"
| i when i <= 0 ->
cctxt#error "While parsing --transfers: negative integer"
| i -> return i))
let single_op_per_pkh_per_block_arg =
Tezos_clic.switch
~long:"single-op-per-pkh-per-block"
~doc:
"ensure that the operations are not rejected by limiting the injection \
to 1 operation per public_key_hash per block."
()
let level_limit_arg =
let open Tezos_clic in
arg
~long:"level-limit"
~placeholder:"integer | +integer"
~doc:
"Level at which the stresstest will stop (if prefixed by '+', the level \
is relative to the current head)"
(parameter (fun (cctxt : Protocol_client_context.full) s ->
match int_of_string s with
| exception _ ->
cctxt#error "While parsing --levels: invalid integer literal"
| i when i <= 0 ->
cctxt#error "While parsing --levels: negative integer"
| i -> if String.get s 0 = '+' then return (Rel i) else return (Abs i)))
let verbose_arg =
Tezos_clic.switch
~long:"verbose"
~doc:"Display detailed logs of the injected operations"
()
let debug_arg = Tezos_clic.switch ~long:"debug" ~doc:"Display debug logs" ()
let set_option opt f x = Option.fold ~none:x ~some:(f x) opt
let save_pool_callback (cctxt : Protocol_client_context.full) pool_source state
=
let json =
Data_encoding.Json.construct
input_source_list_encoding
(List.map unnormalize_source state.pool)
in
let catch_write_error = function
| Error e ->
cctxt#message
"could not write back json file: %a"
Error_monad.pp_print_trace
e
| Ok () -> Lwt.return_unit
in
match pool_source with
| From_string _ ->
(* If the initial pool was given directly as json, save pool to
a temp file. *)
let path = Filename.temp_file "client-stresstest-pool-" ".json" in
cctxt#message "writing back address pool in file %s" path >>= fun () ->
Lwt_utils_unix.Json.write_file path json >>= catch_write_error
| From_file {path; _} ->
(* If the pool specification was a json file, save pool to
the same file. *)
cctxt#message "writing back address pool in file %s" path >>= fun () ->
Lwt_utils_unix.Json.write_file path json >>= catch_write_error
let generate_random_transactions =
let open Tezos_clic in
command
~group
~desc:"Generate random transactions"
(args12
seed_arg
tps_arg
fresh_probability_arg
strategy_arg
Client_proto_args.fee_arg
gas_limit_arg
storage_limit_arg
transfers_arg
single_op_per_pkh_per_block_arg
level_limit_arg
verbose_arg
debug_arg)
(prefixes ["stresstest"; "transfer"; "using"]
@@ param
~name:"sources.json"
~desc:
{|List of accounts from which to perform transfers in JSON format. The input JSON must be an array of objects of the form {"pkh":"<pkh>","pk":"<pk>","sk":"<sk>"} or {"alias":"<alias from wallet>"} or {"pkh":"<pkh from wallet>"} with the pkh, pk and sk encoded in B58 form."|}
json_file_or_text_parameter
@@ stop)
(fun ( seed,
tps,
freshp,
strat,
fee,
gas_limit,
storage_limit,
transfers,
single_op_per_pkh_per_block,
level_limit,
verbose_flag,
debug_flag )
sources_json
(cctxt : Protocol_client_context.full) ->
verbose := verbose_flag ;
debug := debug_flag ;
let parameters =
default_parameters
|> set_option seed (fun parameter seed -> {parameter with seed})
|> set_option tps (fun parameter tps -> {parameter with tps})
|> set_option freshp (fun parameter fresh_probability ->
{parameter with fresh_probability})
|> set_option strat (fun parameter strategy ->
{parameter with strategy})
|> set_option fee (fun parameter fee_mutez ->
{parameter with fee_mutez})
|> set_option gas_limit (fun parameter gas_limit ->
{parameter with gas_limit})
|> set_option storage_limit (fun parameter storage_limit ->
{parameter with storage_limit})
|> set_option transfers (fun parameter transfers ->
{parameter with total_transfers = Some transfers})
|> fun parameter ->
{parameter with single_op_per_pkh_per_block}
|> set_option level_limit (fun parameter level_limit ->
{parameter with level_limit = Some level_limit})
in
match
Data_encoding.Json.destruct
input_source_list_encoding
(json_of_pool_source sources_json)
with
| exception _ -> cctxt#error "Could not decode list of sources"
| [] -> cctxt#error "It is required to provide sources"
| sources ->
(if !verbose then cctxt#message "starting to normalize sources"
else Lwt.return_unit)
>>= fun () ->
List.filter_map_s (normalize_source cctxt) sources >>= fun sources ->
(if !verbose then cctxt#message "all sources have been normalized"
else Lwt.return_unit)
>>= fun () ->
let counters =
Tezos_crypto.Signature.V0.Public_key_hash.Table.create 1023
in
let rng = Random.State.make [|parameters.seed|] in
Protocol_client_context.Alpha_block_services.header cctxt ()
>>=? fun header_on_start ->
let current_head_on_start = header_on_start.hash in
let state =
{
current_head_on_start;
counters;
pool = sources;
pool_size = List.length sources;
shuffled_pool =
(if parameters.single_op_per_pkh_per_block then
Some
(List.shuffle
~rng
(List.map (fun src_org -> src_org.source) sources))
else None);
revealed = Tezos_crypto.Signature.V0.Public_key_hash.Set.empty;
last_block = current_head_on_start;
last_level = Int32.to_int header_on_start.shell.level;
new_block_condition = Lwt_condition.create ();
injected_operations = Block_hash.Table.create 1023;
}
in
let exit_callback_id =
Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _retcode ->
stat_on_exit cctxt state >>= function
| Ok () -> Lwt.return_unit
| Error e ->
cctxt#message "Error: %a" Error_monad.pp_print_trace e)
in
let save_pool () = save_pool_callback cctxt sources_json state in
(* Register a callback for saving the pool when the tool is interrupted
through ctrl-c *)
let exit_callback_id =
Lwt_exit.register_clean_up_callback
~loc:__LOC__
~after:[exit_callback_id]
(fun _retcode -> save_pool ())
in
let save_injected_operations () =
save_injected_operations cctxt state
in
ignore
(Lwt_exit.register_clean_up_callback
~loc:__LOC__
~after:[exit_callback_id]
(fun _retcode -> save_injected_operations ())) ;
launch cctxt parameters state rng save_pool)
let commands = [generate_random_transactions]
let commands network () =
match network with Some `Mainnet -> [] | Some `Testnet | None -> commands