baking_lib.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
open Protocol
open Alpha_context
open Baking_state
let create_state cctxt ?synchronize ?monitor_node_mempool ~config
~current_proposal delegates =
let open Lwt_result_syntax in
let chain = cctxt#chain in
let monitor_node_operations = monitor_node_mempool in
let*! operation_worker =
Operation_worker.create ?monitor_node_operations cctxt
in
Baking_scheduling.create_initial_state
cctxt
?synchronize
~chain
config
operation_worker
~current_proposal
delegates
let get_current_proposal cctxt ?cache () =
let open Lwt_result_syntax in
let* block_stream, _block_stream_stopper =
Node_rpc.monitor_heads cctxt ?cache ~chain:cctxt#chain ()
in
let*! current_head = Lwt_stream.peek block_stream in
match current_head with
| Some current_head -> return (block_stream, current_head)
| None -> failwith "head stream unexpectedly ended"
module Events = Baking_events.Lib
let preattest (cctxt : Protocol_client_context.full) ?(force = false) delegates
=
let open State_transitions in
let open Lwt_result_syntax in
let cache = Baking_cache.Block_cache.create 10 in
let* _, current_proposal = get_current_proposal cctxt ~cache () in
let config = Baking_configuration.make ~force () in
let* state = create_state cctxt ~config ~current_proposal delegates in
let proposal = state.level_state.latest_proposal in
let*! () =
Events.(
emit attempting_preattest_proposal state.level_state.latest_proposal)
in
let* () =
if force then return_unit
else
let*! proposal_acceptance =
is_acceptable_proposal_for_current_level state proposal
in
match proposal_acceptance with
| Invalid -> cctxt#error "Cannot preattest an invalid proposal"
| Outdated_proposal -> cctxt#error "Cannot preattest an outdated proposal"
| Valid_proposal -> return_unit
in
let consensus_list = make_consensus_list state proposal in
let*! () =
cctxt#message
"@[<v 2>Preattesting for:@ %a@]"
Format.(
pp_print_list
~pp_sep:pp_print_space
Baking_state.pp_consensus_key_and_delegate)
(List.map fst consensus_list)
in
Baking_actions.inject_preattestations state ~preattestations:consensus_list
let attest (cctxt : Protocol_client_context.full) ?(force = false) delegates =
let open State_transitions in
let open Lwt_result_syntax in
let cache = Baking_cache.Block_cache.create 10 in
let* _, current_proposal = get_current_proposal cctxt ~cache () in
let config = Baking_configuration.make ~force () in
let* state = create_state cctxt ~config ~current_proposal delegates in
let proposal = state.level_state.latest_proposal in
let*! () =
Events.(emit attempting_attest_proposal state.level_state.latest_proposal)
in
let* () =
if force then return_unit
else
let*! proposal_acceptance =
is_acceptable_proposal_for_current_level state proposal
in
match proposal_acceptance with
| Invalid -> cctxt#error "Cannot attest an invalid proposal"
| Outdated_proposal -> cctxt#error "Cannot attest an outdated proposal"
| Valid_proposal -> return_unit
in
let consensus_list = make_consensus_list state proposal in
let*! () =
cctxt#message
"@[<v 2>Attesting for:@ %a@]"
Format.(
pp_print_list
~pp_sep:pp_print_space
Baking_state.pp_consensus_key_and_delegate)
(List.map fst consensus_list)
in
let* () =
Baking_state.may_record_new_state ~previous_state:state ~new_state:state
in
Baking_actions.inject_attestations state ~attestations:consensus_list
let bake_at_next_level state =
let open Lwt_result_syntax in
let cctxt = state.global_state.cctxt in
let*! baking_time =
Baking_scheduling.compute_next_potential_baking_time_at_next_level state
in
match baking_time with
| None -> cctxt#error "No baking slot found for the delegates"
| Some (timestamp, round) ->
let*! () =
cctxt#message
"Waiting until %a for round %a"
Timestamp.pp
timestamp
Round.pp
round
in
let*! () =
Option.value
~default:Lwt.return_unit
(Baking_scheduling.sleep_until timestamp)
in
return (Baking_state.Timeout (Time_to_bake_next_level {at_round = round}))
(* Simulate the end of the current round to bootstrap the automaton
or attest the block if necessary *)
let first_automaton_event state =
match state.level_state.elected_block with
| None -> Lwt.return (Baking_scheduling.compute_bootstrap_event state)
| Some _elected_block ->
(* If there is an elected block we can directly bake at next
level after waiting its date *)
bake_at_next_level state
let attestations_attesting_power state attestations =
let get_attestation_voting_power {slot; _} =
match
Delegate_slots.voting_power state.level_state.delegate_slots ~slot
with
| None -> 0 (* cannot happen *)
| Some attesting_power -> attesting_power
in
List.sort_uniq compare attestations
|> List.fold_left
(fun power attestation ->
power + get_attestation_voting_power attestation)
0
let generic_attesting_power (filter : packed_operation list -> 'a list)
(extract : 'a -> consensus_content) state =
let current_mempool =
Operation_worker.get_current_operations state.global_state.operation_worker
in
let latest_proposal = state.level_state.latest_proposal in
let block_round = latest_proposal.block.round in
let shell_level = latest_proposal.block.shell.level in
let attestations =
filter (Operation_pool.Operation_set.elements current_mempool.consensus)
in
let attestations_in_mempool =
List.filter_map
(fun v ->
let consensus_content = extract v in
if
Round.(consensus_content.round = block_round)
&& Compare.Int32.(
Raw_level.to_int32 consensus_content.level = shell_level)
then Some consensus_content
else None)
attestations
in
let power = attestations_attesting_power state attestations_in_mempool in
(power, attestations)
let state_attesting_power =
generic_attesting_power
Operation_pool.filter_attestations
(fun
({
protocol_data = {contents = Single (Attestation consensus_content); _};
_;
} :
Kind.attestation operation)
-> consensus_content)
let do_action (state, action) =
let state_recorder ~new_state =
Baking_state.may_record_new_state ~previous_state:state ~new_state
in
Baking_actions.perform_action ~state_recorder state action
let propose_at_next_level ~minimal_timestamp state =
let open Lwt_result_syntax in
let cctxt = state.global_state.cctxt in
assert (Option.is_some state.level_state.elected_block) ;
if minimal_timestamp then
let* minimal_round, delegate =
match
Baking_scheduling.first_potential_round_at_next_level
state
~earliest_round:Round.zero
with
| None -> cctxt#error "No potential baking slot for the given delegates."
| Some first_potential_round -> return first_potential_round
in
let pool =
Operation_worker.get_current_operations
state.global_state.operation_worker
in
let kind = Baking_actions.Fresh pool in
let block_to_bake : Baking_actions.block_to_bake =
{
Baking_actions.predecessor = state.level_state.latest_proposal.block;
round = minimal_round;
delegate;
kind;
force_apply = state.global_state.config.force_apply;
}
in
let state_recorder ~new_state =
Baking_state.may_record_new_state ~previous_state:state ~new_state
in
let* state =
Baking_actions.perform_action
~state_recorder
state
(Inject_block {block_to_bake; updated_state = state})
in
let*! () =
cctxt#message
"Proposed block at round %a on top of %a "
Round.pp
block_to_bake.round
Block_hash.pp
block_to_bake.predecessor.hash
in
return state
else
let* event = bake_at_next_level state in
let* state =
let*! action = State_transitions.step state event in
do_action action
in
let*! () = cctxt#message "Proposal injected" in
return state
let attestation_quorum state =
let power, attestations = state_attesting_power state in
if
Compare.Int.(
power >= state.global_state.constants.parametric.consensus_threshold)
then Some (power, attestations)
else None
(* Here's the sketch of the algorithm:
Do I have an attestation quorum for the current block or an elected block?
- Yes :: wait and propose at next level
- No ::
Is the current proposal at the right round?
- Yes :: fail propose
- No ::
Is there a preattestation quorum or does the last proposal contain a prequorum?
- Yes :: repropose block with right payload and preattestations for current round
- No :: repropose fresh block for current round *)
let propose (cctxt : Protocol_client_context.full) ?minimal_fees
?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply ?force
?(minimal_timestamp = false) ?extra_operations ?context_path delegates =
let open Lwt_result_syntax in
let cache = Baking_cache.Block_cache.create 10 in
let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in
let config =
Baking_configuration.make
?minimal_fees
?minimal_nanotez_per_gas_unit
?minimal_nanotez_per_byte
?context_path
?force_apply
?force
?extra_operations
()
in
let* state = create_state cctxt ~config ~current_proposal delegates in
(* Make sure the operation worker is populated to avoid empty blocks
being proposed. *)
let* () =
Operation_worker.retrieve_pending_operations
cctxt
state.global_state.operation_worker
in
let* _ =
match state.level_state.elected_block with
| Some _ -> propose_at_next_level ~minimal_timestamp state
| None -> (
match attestation_quorum state with
| Some (_voting_power, attestation_qc) ->
let state =
{
state with
round_state =
{
state.round_state with
current_phase = Baking_state.Awaiting_attestations;
};
}
in
let latest_proposal = state.level_state.latest_proposal.block in
let candidate =
{
Operation_worker.hash = latest_proposal.hash;
round_watched = latest_proposal.round;
payload_hash_watched = latest_proposal.payload_hash;
}
in
let* state =
let*! action =
State_transitions.step
state
(Baking_state.Quorum_reached (candidate, attestation_qc))
in
do_action action
(* this will register the elected block *)
in
propose_at_next_level ~minimal_timestamp state
| None -> (
let*? event = Baking_scheduling.compute_bootstrap_event state in
let*! state, _action = State_transitions.step state event in
let latest_proposal = state.level_state.latest_proposal in
let open State_transitions in
let round = state.round_state.current_round in
let*! proposal_acceptance =
is_acceptable_proposal_for_current_level state latest_proposal
in
match proposal_acceptance with
| Invalid | Outdated_proposal -> (
match round_proposer state ~level:`Current round with
| Some {consensus_key_and_delegate; _} ->
let*! action =
State_transitions.propose_block_action
state
consensus_key_and_delegate
round
state.level_state.latest_proposal
in
let* state = do_action (state, action) in
let*! () =
cctxt#message
"Reproposed block at level %ld on round %a"
state.level_state.current_level
Round.pp
state.round_state.current_round
in
return state
| None -> cctxt#error "No slots for current round")
| Valid_proposal ->
cctxt#error
"Cannot propose: there's already a valid proposal for the \
current round %a"
Round.pp
round))
in
return_unit
let repropose (cctxt : Protocol_client_context.full) ?force ?force_round
delegates =
let open Lwt_result_syntax in
let open Baking_state in
let cache = Baking_cache.Block_cache.create 10 in
let* _block_stream, current_proposal = get_current_proposal cctxt ~cache () in
let config = Baking_configuration.make ?force () in
let* state = create_state cctxt ~config ~current_proposal delegates in
(* Make sure the operation worker is populated to avoid empty blocks
being proposed. *)
let*? event = Baking_scheduling.compute_bootstrap_event state in
let*! state, _action = State_transitions.step state event in
let latest_proposal = state.level_state.latest_proposal in
let open State_transitions in
let round =
match force_round with
| Some x -> x
| None -> state.round_state.current_round
in
let*! proposal_validity =
is_acceptable_proposal_for_current_level state latest_proposal
in
match proposal_validity with
| Invalid | Outdated_proposal -> (
match Baking_state.round_proposer state ~level:`Current round with
| Some {consensus_key_and_delegate; _} ->
let*! action =
State_transitions.propose_block_action
state
consensus_key_and_delegate
round
state.level_state.latest_proposal
in
let* state = do_action (state, action) in
let*! () =
cctxt#message
"Reproposed block at level %ld on round %a"
state.level_state.current_level
Round.pp
state.round_state.current_round
in
return_unit
| None -> cctxt#error "No slots for current round")
| Valid_proposal ->
cctxt#error
"Cannot propose: there's already a valid proposal for the current \
round %a"
Round.pp
round
let bake_using_automaton ~count config state heads_stream =
let open Lwt_result_syntax in
let cctxt = state.global_state.cctxt in
let* initial_event = first_automaton_event state in
let current_level = state.level_state.latest_proposal.block.shell.level in
let loop_state =
Baking_scheduling.create_loop_state
~heads_stream
state.global_state.operation_worker
in
let stop_on_next_level_block = function
| New_head_proposal proposal ->
Compare.Int32.(
proposal.block.shell.level >= Int32.(add current_level (of_int count)))
| _ -> false
in
let* event_opt =
Baking_scheduling.automaton_loop
~stop_on_event:stop_on_next_level_block
~config
~on_error:(fun err -> Lwt.return (Error err))
loop_state
state
initial_event
in
match event_opt with
| Some (New_head_proposal proposal) ->
let*! () =
cctxt#message
"Last injected block: %a (level %ld)"
Block_hash.pp
proposal.block.hash
proposal.block.shell.level
in
return_unit
| _ -> cctxt#error "Baking loop unexpectedly ended"
(* attest the latest proposal and bake with it *)
let rec baking_minimal_timestamp ~count state
(block_stream : proposal Lwt_stream.t) =
let open Lwt_result_syntax in
let cctxt = state.global_state.cctxt in
let latest_proposal = state.level_state.latest_proposal in
let own_attestations =
State_transitions.make_consensus_list state latest_proposal
in
let current_mempool =
Operation_worker.get_current_operations state.global_state.operation_worker
in
let attestations_in_mempool =
Operation_pool.(
filter_attestations (Operation_set.elements current_mempool.consensus))
|> List.filter_map
(fun
({
protocol_data =
{contents = Single (Attestation consensus_content); _};
_;
} :
Kind.attestation operation)
->
if
Round.(consensus_content.round = latest_proposal.block.round)
&& Compare.Int32.(
Raw_level.to_int32 consensus_content.level
= latest_proposal.block.shell.level)
then Some consensus_content
else None)
in
let total_voting_power =
List.fold_left
(fun attestations own -> snd own :: attestations)
attestations_in_mempool
own_attestations
|> attestations_attesting_power state
in
let consensus_threshold =
state.global_state.constants.parametric.consensus_threshold
in
let* () =
if Compare.Int.(total_voting_power < consensus_threshold) then
cctxt#error
"Delegates do not have enough voting power. Only %d is available while \
%d is required."
total_voting_power
consensus_threshold
else return_unit
in
let* minimal_round, delegate =
match
Baking_scheduling.first_potential_round_at_next_level
state
~earliest_round:Round.zero
with
| None -> cctxt#error "No potential baking slot for the given delegates."
| Some first_potential_round -> return first_potential_round
in
let* signed_attestations =
Baking_actions.sign_attestations state own_attestations
in
let pool =
Operation_pool.add_operations
current_mempool
(List.map (fun (_, x, _, _) -> x) signed_attestations)
in
let attestation_level = Int32.succ latest_proposal.block.shell.level in
let* own_dal_attestations =
Baking_actions.get_dal_attestations state ~attestation_level
in
let* signed_dal_attestations =
Baking_actions.sign_dal_attestations state own_dal_attestations
in
let pool =
Operation_pool.add_operations
pool
(List.map
(fun (_delegate, op, _bitset, _published_level) -> op)
signed_dal_attestations)
in
let kind = Baking_actions.Fresh pool in
let block_to_bake : Baking_actions.block_to_bake =
{
Baking_actions.predecessor = latest_proposal.block;
round = minimal_round;
delegate;
kind;
force_apply = state.global_state.config.force_apply;
}
in
let state_recorder ~new_state =
Baking_state.may_record_new_state ~previous_state:state ~new_state
in
let* new_state =
Baking_actions.perform_action
~state_recorder
state
(Inject_block {block_to_bake; updated_state = state})
in
let*! () = cctxt#message "Injected block at minimal timestamp" in
if count <= 1 then return_unit
else
let*! () =
Lwt_stream.junk_while_s
(fun proposal ->
Lwt.return
Compare.Int32.(
proposal.Baking_state.block.shell.level <> attestation_level))
block_stream
in
let*! next_level_proposal =
let*! r = Lwt_stream.get block_stream in
match r with
| None -> cctxt#error "Stream unexpectedly ended"
| Some b -> Lwt.return b
in
let*! new_state, action =
State_transitions.step new_state (New_head_proposal next_level_proposal)
in
let* new_state =
match action with
| Update_to_level update ->
let* new_state, _preattest_action =
Baking_actions.update_to_level new_state update
in
return
{
new_state with
round_state =
{
Baking_state.current_round = Round.zero;
current_phase = Idle;
delayed_prequorum = None;
};
}
| _ ->
(* Algorithmically, this will always be an update_to_level
action. *)
assert false
in
baking_minimal_timestamp ~count:(pred count) new_state block_stream
let bake (cctxt : Protocol_client_context.full) ?minimal_fees
?minimal_nanotez_per_gas_unit ?minimal_nanotez_per_byte ?force_apply ?force
?(minimal_timestamp = false) ?extra_operations
?(monitor_node_mempool = true) ?context_path ?dal_node_endpoint ?(count = 1)
?votes delegates =
let open Lwt_result_syntax in
let config =
Baking_configuration.make
?minimal_fees
?minimal_nanotez_per_gas_unit
?minimal_nanotez_per_byte
?context_path
?force_apply
?force
?extra_operations
?dal_node_endpoint
?votes
()
in
let cache = Baking_cache.Block_cache.create 10 in
let* block_stream, current_proposal = get_current_proposal cctxt ~cache () in
let* state =
create_state
cctxt
~monitor_node_mempool
~synchronize:(not minimal_timestamp)
~config
~current_proposal
delegates
in
let* () =
when_ monitor_node_mempool (fun () ->
(* Make sure the operation worker is populated to avoid empty
blocks being baked *)
Operation_worker.retrieve_pending_operations
cctxt
state.global_state.operation_worker)
in
if not minimal_timestamp then
bake_using_automaton ~count config state block_stream
else baking_minimal_timestamp ~count state block_stream