daemon.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2022 Nomadic Labs, <contact@nomadic-labs.com> *)
(* Copyright (c) 2022 Marigold, <contact@marigold.dev> *)
(* Copyright (c) 2022 Oxhead Alpha <info@oxhead-alpha.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
open Protocol.Apply_results
open Protocol.Apply_internal_results
open Tezos_shell_services
open Protocol_client_context
open Protocol
open Alpha_context
open Error
let parse_tx_rollup_l2_address :
Script.node -> Protocol.Tx_rollup_l2_address.Indexable.value tzresult =
let open Protocol in
let open Micheline in
function
| Bytes (loc, bytes) (* As unparsed with [Optimized]. *) -> (
match Tx_rollup_l2_address.of_bytes_opt bytes with
| Some txa -> ok (Tx_rollup_l2_address.Indexable.value txa)
| None -> error (Error.Tx_rollup_invalid_l2_address loc))
| String (loc, str) (* As unparsed with [Readable]. *) -> (
match Tx_rollup_l2_address.of_b58check_opt str with
| Some txa -> ok (Tx_rollup_l2_address.Indexable.value txa)
| None -> error (Error.Tx_rollup_invalid_l2_address loc))
| Int (loc, _) | Prim (loc, _, _, _) | Seq (loc, _) ->
error (Error.Tx_rollup_invalid_l2_address loc)
let parse_ticketer : Script.node -> Contract.t tzresult =
let open Micheline in
function
| Bytes (_loc, bytes) (* As unparsed with [Optimized]. *) ->
Result.of_option ~error:[Wrong_deposit_parameters]
@@ Data_encoding.Binary.of_bytes_opt Contract.encoding bytes
| String (_loc, str) (* As unparsed with [Readable]. *) ->
Environment.wrap_tzresult @@ Contract.of_b58check str
| Int _ | Prim _ | Seq _ -> error Wrong_deposit_parameters
let parse_tx_rollup_deposit_parameters :
Script.expr ->
(Contract.t
* Script.expr
* Script.expr
* Protocol.Tx_rollup_l2_qty.t
* Protocol.Script_typed_ir.tx_rollup_l2_address)
tzresult =
fun parameters ->
let open Result_syntax in
let open Micheline in
let open Protocol in
(* /!\ This pattern matching needs to remain in sync with the deposit
parameters. See the transaction to Tx_rollup case in
Protocol.Apply.Apply.apply_internal_operation_contents *)
match root parameters with
| Seq
( _,
[
Prim
( _,
D_Pair,
[
Prim
( _,
D_Pair,
[ticketer; Prim (_, D_Pair, [contents; amount], _)],
_ );
bls;
],
_ );
ty;
] ) ->
let* destination = parse_tx_rollup_l2_address bls in
let* amount =
match amount with
| Int (_, v)
when Compare.Z.(Z.zero < v && v <= Z.of_int64 Int64.max_int) ->
ok @@ Tx_rollup_l2_qty.of_int64_exn (Z.to_int64 v)
| Int (_, invalid_amount) ->
error (Error.Tx_rollup_invalid_ticket_amount invalid_amount)
| _expr -> error Error.Tx_rollup_invalid_deposit
in
let* ticketer = parse_ticketer ticketer in
let ty = strip_locations ty in
let contents = strip_locations contents in
return (ticketer, ty, contents, amount, destination)
| _expr -> error Error.Tx_rollup_invalid_deposit
let extract_messages_from_block block_info rollup_id =
let managed_operation =
List.nth_opt
block_info.Alpha_block_services.operations
State.rollup_operation_index
in
let add_message_ticket (msg, _size) new_ticket (messages, tickets) =
let tickets =
match new_ticket with None -> tickets | Some ticket -> ticket :: tickets
in
(msg :: messages, tickets)
in
let get_messages_of_internal_operation ~source messages_tickets
(Internal_operation_result
( {
operation;
source = _use_the_source_of_the_external_operation;
nonce = _;
},
result )) =
match (operation, result) with
| ( Transaction
{amount = _; parameters; destination = Tx_rollup dst; entrypoint},
Applied
(ITransaction_result
(Transaction_to_tx_rollup_result {ticket_hash; _})) )
when Tx_rollup.equal dst rollup_id
&& Entrypoint.(entrypoint = Entrypoint.deposit) ->
(* Deposit message *)
( Option.bind (Data_encoding.force_decode parameters)
@@ fun parameters ->
parse_tx_rollup_deposit_parameters parameters |> Result.to_option )
|> Option.fold
~none:messages_tickets
~some:(fun (ticketer, ty, contents, amount, destination) ->
let deposit =
Tx_rollup_message.make_deposit
source
destination
ticket_hash
amount
in
add_message_ticket
deposit
(Some Ticket.{ticketer; ty; contents; hash = ticket_hash})
messages_tickets)
| _ -> messages_tickets
in
let get_messages :
type kind.
source:public_key_hash ->
kind manager_operation ->
kind manager_operation_result ->
packed_internal_operation_result list ->
Tx_rollup_message.t list * Ticket.t list ->
Tx_rollup_message.t list * Ticket.t list =
fun ~source op result internal_operation_results messages_tickets ->
let acc =
match (op, result) with
| ( Tx_rollup_submit_batch {tx_rollup; content; burn_limit = _},
Applied (Tx_rollup_submit_batch_result _) )
when Tx_rollup.equal rollup_id tx_rollup ->
(* Batch message *)
add_message_ticket
(Tx_rollup_message.make_batch content)
None
messages_tickets
| _, _ -> messages_tickets
in
(* Add messages from internal operations *)
List.fold_left
(get_messages_of_internal_operation ~source)
acc
internal_operation_results
in
let rec get_related_messages :
type kind.
Tx_rollup_message.t list * Ticket.t list ->
kind contents_and_result_list ->
Tx_rollup_message.t list * Ticket.t list =
fun acc -> function
| Single_and_result
( Manager_operation {operation; source; _},
Manager_operation_result
{operation_result; internal_operation_results; _} ) ->
get_messages
~source
operation
operation_result
internal_operation_results
acc
| Single_and_result (_, _) -> acc
| Cons_and_result
( Manager_operation {operation; source; _},
Manager_operation_result
{operation_result; internal_operation_results; _},
rest ) ->
let acc =
get_messages
~source
operation
operation_result
internal_operation_results
acc
in
get_related_messages acc rest
in
let finalize_receipt acc operation =
match Alpha_block_services.(operation.protocol_data, operation.receipt) with
| ( Operation_data {contents = operation_contents; _},
Receipt (Operation_metadata {contents = result_contents}) ) -> (
match kind_equal_list operation_contents result_contents with
| Some Eq ->
let operation_and_result =
pack_contents_list operation_contents result_contents
in
ok (get_related_messages acc operation_and_result)
| None ->
(* Should not happen *)
ok acc)
| _, Receipt No_operation_metadata | _, Empty | _, Too_large ->
error (Tx_rollup_no_operation_metadata operation.hash)
in
match managed_operation with
| None -> ok ([], [])
| Some managed_operations ->
let open Result_syntax in
let+ rev_messages, new_tickets =
List.fold_left_e finalize_receipt ([], []) managed_operations
in
(List.rev rev_messages, new_tickets)
let check_inbox state tezos_block level inbox =
let open Lwt_result_syntax in
trace (Error.Tx_rollup_cannot_check_inbox level)
@@ let* proto_inbox =
Protocol.Tx_rollup_services.inbox
state.State.cctxt
(state.State.cctxt#chain, `Hash (tezos_block, 0))
state.State.rollup_info.rollup_id
level
in
let*? protocol_inbox =
Result.of_option
~error:[Error.Tx_rollup_no_proto_inbox (level, tezos_block)]
proto_inbox
in
let reconstructed_inbox = Inbox.to_proto inbox in
fail_unless
Tx_rollup_inbox.(reconstructed_inbox = protocol_inbox)
(Error.Tx_rollup_inbox_mismatch
{level; reconstructed_inbox; protocol_inbox})
let commit_block_on_l1 state block =
match state.State.signers.operator with
| None -> return_unit
| Some operator ->
Committer.commit_block ~operator state.State.rollup_info.rollup_id block
let store_indexes ctxt contents =
let open Lwt_syntax in
let register_indexes_from_result ctxt result =
let indexes =
let open Protocol.Tx_rollup_l2_apply.Message_result in
match result with
| Deposit_result (Deposit_success indexes) -> indexes
| Batch_V1_result (Batch_result {indexes; _}) -> indexes
| _ -> assert false
in
List.fold_left_s
(fun ctxt (address, index) -> Context.register_address ctxt index address)
ctxt
indexes.address_indexes
in
List.fold_left_s
(fun ctxt Inbox.{result; _} ->
match result with
| Interpreted (result, _) -> register_indexes_from_result ctxt result
| Discarded _ -> return ctxt)
ctxt
contents
let process_messages_and_inboxes (state : State.t)
~(predecessor : L2block.t option) ?predecessor_context block_info =
let open Lwt_result_syntax in
let current_hash = block_info.Alpha_block_services.hash in
let*? messages, new_tickets =
extract_messages_from_block block_info state.State.rollup_info.rollup_id
in
let*! () = Event.(emit messages_application) (List.length messages) in
let* predecessor_context =
match predecessor_context with
| Some context -> return context
| None -> (
match predecessor with
| None ->
let*! ctxt = Context.init_context state.context_index in
return ctxt
| Some predecessor ->
Context.checkout state.context_index predecessor.header.context)
in
let parameters =
Protocol.Tx_rollup_l2_apply.
{
tx_rollup_max_withdrawals_per_batch =
state.constants.parametric.tx_rollup.max_withdrawals_per_batch;
}
in
let context = predecessor_context in
let* context, contents =
Interpreter.interpret_messages
context
parameters
~rejection_max_proof_size:
state.constants.parametric.tx_rollup.rejection_max_proof_size
messages
in
let* context =
List.fold_left_es
(fun context ticket ->
let* ticket_index =
Context.Ticket_index.get context ticket.Ticket.hash
in
match ticket_index with
| None ->
(* Can only happen if the interpretation of the corresponding deposit
fails (with an overflow on amounts or indexes). *)
return context
| Some ticket_index ->
let*! context =
Context.register_ticket context ticket_index ticket
in
return context)
context
new_tickets
in
match contents with
| None ->
(* No inbox at this block *)
return (`Old predecessor, predecessor_context)
| Some inbox ->
let*! context = store_indexes context inbox in
let*! context_hash = Context.commit context in
let level, predecessor_hash =
match predecessor with
| None -> (Tx_rollup_level.root, None)
| Some {hash; header = {level; _}; _} ->
(Tx_rollup_level.succ level, Some hash)
in
let* () = check_inbox state current_hash level inbox in
let commitment = Committer.commitment_of_inbox ~predecessor level inbox in
let header : L2block.header =
{
level;
tezos_block = current_hash;
predecessor = predecessor_hash;
context = context_hash;
commitment =
Tx_rollup_commitment.(Compact.hash (Full.compact commitment));
}
in
let hash = L2block.hash_header header in
let block = L2block.{hash; header; inbox; commitment} in
let*! () = State.save_block state block in
let*! () =
Event.(emit rollup_block) (header.level, hash, header.tezos_block)
in
return (`New block, context)
let set_head state head =
let open Lwt_result_syntax in
let* _l2_reorg = State.set_head state head in
let*! new_head_batcher = Batcher.new_head head in
match new_head_batcher with
| Error [No_batcher] -> return_unit
| Ok () -> return_unit
| Error _ as res -> Lwt.return res
let originated_in_block rollup_id block =
let check_origination_content_result : type kind. kind contents_result -> bool
= function
| Manager_operation_result
{
operation_result =
Applied (Tx_rollup_origination_result {originated_tx_rollup; _});
_;
} ->
Tx_rollup.(originated_tx_rollup = rollup_id)
| _ -> false
in
let rec check_origination_content_result_list :
type kind. kind contents_result_list -> bool = function
| Single_result x -> check_origination_content_result x
| Cons_result (x, xs) ->
check_origination_content_result x
|| check_origination_content_result_list xs
in
let manager_operations =
List.nth_opt
block.Alpha_block_services.operations
State.rollup_operation_index
in
let has_rollup_origination operation =
match operation.Alpha_block_services.receipt with
| Receipt (Operation_metadata {contents}) ->
check_origination_content_result_list contents
| Receipt No_operation_metadata | Empty | Too_large -> false
in
match manager_operations with
| None -> false
| Some ops -> List.exists has_rollup_origination ops
let rec process_block state current_hash =
let open Lwt_result_syntax in
let rollup_id = state.State.rollup_info.rollup_id in
let*! l2_block = State.tezos_block_already_processed state current_hash in
match l2_block with
| `Known maybe_l2_block ->
(* Already processed *)
let*! () = Event.(emit block_already_processed) current_hash in
let* () =
match maybe_l2_block with
| Some l2_block -> set_head state l2_block
| None -> return_unit
in
return (maybe_l2_block, None, [])
| `Unknown ->
state.State.sync.synchronized <- false ;
let* block_info = State.fetch_tezos_block state current_hash in
let predecessor_hash = block_info.header.shell.predecessor in
let block_level = block_info.header.shell.level in
let* () =
match state.State.rollup_info.origination_level with
| Some origination_level when block_level < origination_level ->
tzfail Tx_rollup_originated_in_fork
| _ -> return_unit
in
(* Handle predecessor Tezos block first *)
let*! () =
Event.(emit processing_block_predecessor)
(predecessor_hash, Int32.pred block_level)
in
let* l2_predecessor, predecessor_context, blocks_to_commit =
if originated_in_block rollup_id block_info then
let*! () =
Event.(emit detected_origination) (rollup_id, current_hash)
in
let* () =
State.set_rollup_info state rollup_id ~origination_level:block_level
in
return (None, None, [])
else process_block state predecessor_hash
in
let*! () =
Event.(emit processing_block) (current_hash, predecessor_hash)
in
let* l2_block, context =
process_messages_and_inboxes
state
~predecessor:l2_predecessor
?predecessor_context
block_info
in
let blocks_to_commit =
match l2_block with
| `Old _ -> blocks_to_commit
| `New l2_block -> l2_block :: blocks_to_commit
in
let*! () =
let maybe_l2_block_hash =
match l2_block with
| `Old None -> None
| `Old (Some l2_block) | `New l2_block -> Some l2_block.hash
in
State.save_tezos_block_info
state
current_hash
maybe_l2_block_hash
~level:block_info.header.shell.level
~predecessor:block_info.header.shell.predecessor
in
let* l2_block =
match l2_block with
| `Old None -> return_none
| `Old (Some l2_block) | `New l2_block ->
let* () = set_head state l2_block in
return_some l2_block
in
State.notify_processed_tezos_level state block_info.header.shell.level ;
let*! () =
Event.(emit tezos_block_processed) (current_hash, block_level)
in
return (l2_block, Some context, blocks_to_commit)
let batch () = if Batcher.active () then Batcher.batch () else return_unit
let notify_head state head reorg =
let open Lwt_result_syntax in
let* head = State.fetch_tezos_block state head in
let*! () = Injector.new_tezos_head head reorg in
return_unit
let queue_gc_operations state =
let open Lwt_result_syntax in
let tx_rollup = state.State.rollup_info.rollup_id in
let queue_finalize_commitment state =
match state.State.signers.finalize_commitment with
| None -> return_unit
| Some source ->
let* _hash =
Injector.add_pending_operation
~source
(Tx_rollup_finalize_commitment {tx_rollup})
in
return_unit
in
let queue_remove_commitment state =
match state.State.signers.remove_commitment with
| None -> return_unit
| Some source ->
let* _hash =
Injector.add_pending_operation
~source
(Tx_rollup_remove_commitment {tx_rollup})
in
return_unit
in
let* () = queue_finalize_commitment state in
queue_remove_commitment state
let dispatch_withdrawals_on_l1 state level =
let open Lwt_result_syntax in
match state.State.signers.dispatch_withdrawals with
| None -> return_unit
| Some source -> (
let*! block = State.get_level_l2_block state level in
match block with
| None -> return_unit
| Some block -> Dispatcher.dispatch_withdrawals ~source state block)
let reject_bad_commitment state commitment =
let open Lwt_result_syntax in
match state.State.signers.rejection with
| None -> return_unit
| Some source -> Accuser.reject_bad_commitment ~source state commitment
let fail_when_slashed (type kind) state l1_operation
(result : kind manager_operation_result) =
let open Lwt_result_syntax in
let open Apply_results in
match state.State.signers.operator with
| None -> return_unit
| Some operator -> (
(* This function handles external operations only. Internal operations have
to be handled in [handle] in [handle_l1_operation] below. *)
match result with
| Applied result ->
let balance_updates =
match result with
| Tx_rollup_commit_result {balance_updates; _}
| Tx_rollup_rejection_result {balance_updates; _} ->
(* These are the only two operations which can slash a bond. *)
balance_updates
| _ -> []
in
let frozen_debit, punish =
List.fold_left
(fun (frozen_debit, punish) -> function
| Receipt.(Tx_rollup_rejection_punishments, Credited _, _) ->
(* Someone was punished *)
(frozen_debit, true)
| Frozen_bonds (committer, _), Debited _, _
when Contract.(committer = Implicit operator) ->
(* Our frozen bonds are gone *)
(true, punish)
| _ -> (frozen_debit, punish))
(false, false)
balance_updates
in
fail_when
(frozen_debit && punish)
(Error.Tx_rollup_deposit_slashed l1_operation)
| _ -> return_unit)
let process_op (type kind) (state : State.t) l1_block l1_operation ~source:_
(op : kind manager_operation) (result : kind manager_operation_result)
(acc : 'acc) : 'acc tzresult Lwt.t =
let open Lwt_result_syntax in
let is_my_rollup tx_rollup =
Tx_rollup.equal state.rollup_info.rollup_id tx_rollup
in
let* () = fail_when_slashed state l1_operation result in
(* This function handles external operations only. Internal operations have
to be handled in [handle] in [handle_l1_operation] below. *)
match (op, result) with
| ( Tx_rollup_commit {commitment; tx_rollup},
Applied (Tx_rollup_commit_result _) )
when is_my_rollup tx_rollup ->
let commitment_hash =
Tx_rollup_commitment.(Compact.hash (Full.compact commitment))
in
let*! () =
State.set_commitment_included
state
commitment_hash
l1_block
l1_operation
in
let* () = reject_bad_commitment state commitment in
return acc
| ( Tx_rollup_finalize_commitment {tx_rollup},
Applied (Tx_rollup_finalize_commitment_result {level; _}) )
when is_my_rollup tx_rollup ->
let* () = dispatch_withdrawals_on_l1 state level in
State.set_finalized_level state level
| _, _ -> return acc
let rollback_op (type kind) (state : State.t) _l1_block _l1_operation ~source:_
(op : kind manager_operation) (result : kind manager_operation_result)
(acc : 'acc) : 'acc tzresult Lwt.t =
let open Lwt_result_syntax in
let is_my_rollup tx_rollup =
Tx_rollup.equal state.rollup_info.rollup_id tx_rollup
in
(* This function handles external operations only. Internal operations have
to be handled in [handle] in [handle_l1_operation] below. *)
match (op, result) with
| ( Tx_rollup_commit {commitment; tx_rollup},
Applied (Tx_rollup_commit_result _) )
when is_my_rollup tx_rollup ->
let commitment_hash =
Tx_rollup_commitment.(Compact.hash (Full.compact commitment))
in
let*! () = State.unset_commitment_included state commitment_hash in
return acc
| ( Tx_rollup_finalize_commitment {tx_rollup},
Applied (Tx_rollup_finalize_commitment_result {level; _}) )
when is_my_rollup tx_rollup -> (
match Tx_rollup_level.pred level with
| None ->
let*! () = State.delete_finalized_level state in
return_unit
| Some level -> State.set_finalized_level state level)
| _, _ -> return acc
let handle_l1_operation direction (block : Alpha_block_services.block_info)
state acc (operation : Alpha_block_services.operation) =
let open Lwt_result_syntax in
let handle_op =
match direction with `Rollback -> rollback_op | `Process -> process_op
in
let handle :
type kind.
source:public_key_hash ->
kind manager_operation ->
kind manager_operation_result ->
packed_internal_operation_result list ->
'acc ->
'acc tzresult Lwt.t =
fun ~source op result _internal_operation_results acc ->
handle_op state ~source block.hash operation.hash op result acc
(* There are no messages to handle for internal operations for now. *)
in
let rec handle_list :
type kind. 'acc -> kind contents_and_result_list -> 'acc tzresult Lwt.t =
fun acc -> function
| Single_and_result
( Manager_operation {operation; source; _},
Manager_operation_result
{operation_result; internal_operation_results; _} ) ->
handle ~source operation operation_result internal_operation_results acc
| Single_and_result (_, _) -> return acc
| Cons_and_result
( Manager_operation {operation; source; _},
Manager_operation_result
{operation_result; internal_operation_results; _},
rest ) ->
let* acc =
handle
~source
operation
operation_result
internal_operation_results
acc
in
handle_list acc rest
in
match (operation.protocol_data, operation.receipt) with
| _, Receipt No_operation_metadata | _, Empty | _, Too_large ->
fail [Tx_rollup_no_operation_metadata operation.hash]
| ( Operation_data {contents = operation_contents; _},
Receipt (Operation_metadata {contents = result_contents}) ) -> (
match kind_equal_list operation_contents result_contents with
| None ->
let*! () = Debug_events.(emit should_not_happen) __LOC__ in
return acc
| Some Eq ->
let operation_and_result =
pack_contents_list operation_contents result_contents
in
handle_list acc operation_and_result)
let handle_l1_block direction state acc block =
List.fold_left_es
(List.fold_left_es (handle_l1_operation direction block state))
acc
block.Alpha_block_services.operations
let handle_l1_reorg state acc reorg =
let open Lwt_result_syntax in
let* acc =
List.fold_left_es
(handle_l1_block `Rollback state)
acc
(List.rev reorg.Injector_common.old_chain)
in
let* acc =
List.fold_left_es
(handle_l1_block `Process state)
acc
reorg.Injector_common.new_chain
in
return acc
let notify_synchronized state =
let old_value = state.State.sync.synchronized in
state.State.sync.synchronized <- true ;
if old_value = false then
Lwt_condition.broadcast state.State.sync.on_synchronized ()
let process_head ?(notify_sync = true) state
(current_hash, (current_header : Tezos_base.Block_header.t option)) =
let open Lwt_result_syntax in
(if notify_sync then
match current_header with
| None -> ()
| Some current_header ->
State.set_known_tezos_level state current_header.shell.level) ;
let*! () = Event.(emit new_block) current_hash in
let* _, _, blocks_to_commit = process_block state current_hash in
let* l1_reorg = State.set_tezos_head state current_hash in
if notify_sync then notify_synchronized state ;
let* () = handle_l1_reorg state () l1_reorg in
let* () = List.iter_es (commit_block_on_l1 state) blocks_to_commit in
let* () = batch () in
let* () = queue_gc_operations state in
let* () = notify_head state current_hash l1_reorg in
let*! () =
match current_header with
| None -> Lwt.return_unit
| Some header -> Injector.inject ~header ()
in
return_unit
let look_for_origination_block state block_list =
let open Lwt_result_syntax in
let rollup_id = state.State.rollup_info.rollup_id in
state.State.sync.synchronized <- false ;
let rec loop = function
| [] -> return_none
| block_hash :: rest as block_list ->
let* block = State.fetch_tezos_block state block_hash in
let*! () =
Event.(emit look_for_origination)
(block.hash, block.header.shell.level)
in
if originated_in_block rollup_id block then return_some block_list
else (
State.notify_processed_tezos_level state block.header.shell.level ;
loop rest)
in
loop block_list
let catch_up_on_commitments state =
let open Lwt_result_syntax in
let*! () = Event.(emit catch_up_commitments) () in
let* proto_rollup_state =
Protocol.Tx_rollup_services.state
state.State.cctxt
(state.State.cctxt#chain, `Head 0)
state.State.rollup_info.rollup_id
and* tezos_head =
Shell_services.Blocks.Header.shell_header
state.State.cctxt
~chain:state.State.cctxt#chain
~block:(`Head 0)
()
in
(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2957
We have to serialize the state to access the required information *)
let proto_rollup_state =
let open Data_encoding.Binary in
to_bytes_exn Tx_rollup_state.encoding proto_rollup_state
|> of_bytes_exn Tx_rollup_state_repr.encoding
in
let next_commitment_level =
match
Tx_rollup_state_repr.next_commitment_level
proto_rollup_state
(* Next commitment will be included in next block *)
Raw_level_repr.(succ @@ of_int32_exn tezos_head.level)
with
| Ok l -> Some l
| Error _ ->
(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2957
We assume the error is Tx_rollup_errors.No_uncommitted_inbox as we
cannot match on it. *)
None
in
match next_commitment_level with
| None -> return_unit
| Some next_commitment_level ->
let rec missing_commitments to_commit block =
let open Lwt_syntax in
match block with
| None -> return to_commit
| Some ({L2block.header = {level; _}; _} as block) ->
if
Tx_rollup_level.to_int32 level
< Tx_rollup_level_repr.to_int32 next_commitment_level
then
(* We have iterated over all missing commitments *)
return to_commit
else
let*! predecessor =
Option.filter_map_s
(State.get_block state)
block.header.predecessor
in
missing_commitments (block :: to_commit) predecessor
in
let head = State.get_head state in
let*! to_commit = missing_commitments [] head in
List.iter_es (commit_block_on_l1 state) to_commit
let catch_up_on_blocks (state : State.t) origination_level =
let open Lwt_result_syntax in
let* head = Alpha_block_services.Header.shell_header state.cctxt () in
let* last_tezos_block = State.get_tezos_head state in
let first_handle_level =
match last_tezos_block with
| Some b -> Some (Int32.succ b.header.shell.level)
| None -> origination_level
in
match first_handle_level with
| None -> return_unit
| Some first_handle_level ->
let missing_levels =
Int32.to_int head.level - Int32.to_int first_handle_level + 1
in
let*! () = Event.(emit missing_blocks) missing_levels in
if missing_levels <= 0 then return_unit
else
let* missing_blocks =
Chain_services.Blocks.list state.cctxt ~length:missing_levels ()
in
let missing_blocks =
match missing_blocks with
| missing_blocks :: _ -> List.rev missing_blocks
| [] -> []
in
State.set_known_tezos_level state head.level ;
let* missing_blocks =
match State.get_head state with
| Some _ -> return missing_blocks
| None -> (
(* No L2 blocks processed yet, look for origination first *)
let* missing_blocks =
look_for_origination_block state missing_blocks
in
match missing_blocks with
| None -> tzfail Tx_rollup_originated_in_fork
| Some missing_blocks -> return missing_blocks)
in
let+ () =
List.iter_es
(fun block ->
let*! res = process_head ~notify_sync:false state (block, None) in
match res with
| Error (Tx_rollup_originated_in_fork :: _) -> return_unit
| _ -> Lwt.return res)
missing_blocks
in
notify_synchronized state
let catch_up state =
let open Lwt_result_syntax in
let* () = catch_up_on_commitments state in
catch_up_on_blocks state state.State.rollup_info.origination_level
(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/2958
We may also need to catch up on finalization/removal of commitments here. *)
let check_operator_deposit state config =
let open Lwt_result_syntax in
match state.State.signers.operator with
| None ->
(* No operator for this node, no commitments will be made. *)
return_unit
| Some operator ->
let* has_deposit =
Plugin.RPC.Tx_rollup.has_bond
state.State.cctxt
(state.State.cctxt#chain, `Head 0)
state.State.rollup_info.rollup_id
operator
in
if has_deposit then
(* The operator already has a deposit for this rollup, no other check
necessary. *)
return_unit
else
(* Operator never made a deposit for this rollup, ensure they are ready to
make one. *)
fail_unless
config.Node_config.allow_deposit
Error.Tx_rollup_deposit_not_allowed
let main_exit_callback state rpc_server _exit_status =
let open Lwt_syntax in
let* () = state.State.cctxt#message "Stopping RPC server ..." in
let* () = RPC_server.shutdown rpc_server in
let* () = state.State.cctxt#message "Stopping injector ..." in
let* () = Injector.shutdown () in
let* () = state.State.cctxt#message "Stopping batcher ..." in
let* () = Batcher.shutdown () in
let* () = state.State.cctxt#message "Closing stores ..." in
let* () = Stores.close state.State.stores in
let* () = state.State.cctxt#message "Closing context ..." in
let* () = Context.close state.State.context_index in
let* () = state.State.cctxt#message "Shutting down" in
return_unit
let rec connect ~delay cctxt =
let open Lwt_syntax in
let* res = Monitor_services.heads cctxt cctxt#chain in
match res with
| Ok (stream, stopper) -> return_ok (stream, stopper)
| Error _ ->
let* () = Event.(emit cannot_connect) delay in
let* () = Lwt_unix.sleep delay in
connect ~delay cctxt
let is_connection_error trace =
List.exists
(function
| RPC_client_errors.(Request_failed {error = Connection_failed _; _}) ->
true
| _ -> false)
trace
(* TODO/TORU: https://gitlab.com/tezos/tezos/-/issues/1845
Clean exit *)
let run configuration cctxt =
let open Lwt_result_syntax in
let*! () = Event.(emit starting_node) () in
let {Node_config.signers; reconnection_delay; rollup_id; batch_burn_limit; _}
=
configuration
in
let* state = State.init cctxt configuration in
let* () = check_operator_deposit state configuration in
let* () =
Injector.init
state.cctxt
~data_dir:configuration.data_dir
state
~signers:
(List.filter_map
(function
| None, _, _ -> None
| Some x, strategy, tags -> Some (x, strategy, tags))
[
(signers.operator, `Each_block, [Injector.Commitment]);
(* Batches of L2 operations are submitted with a delay after each
block, to allow for more operations to arrive and be included in
the following block. *)
(signers.submit_batch, `Delay_block 0.5, [Submit_batch]);
(signers.finalize_commitment, `Each_block, [Finalize_commitment]);
(signers.remove_commitment, `Each_block, [Remove_commitment]);
(signers.rejection, `Each_block, [Rejection]);
(signers.dispatch_withdrawals, `Each_block, [Dispatch_withdrawals]);
])
in
let* () =
Option.iter_es
(fun signer ->
Batcher.init
~rollup:rollup_id
~signer
~batch_burn_limit
state.State.context_index
state.State.constants)
signers.submit_batch
in
let* rpc_server = RPC.start_server configuration state in
let _ =
(* Register cleaner callback *)
Lwt_exit.register_clean_up_callback
~loc:__LOC__
(main_exit_callback state rpc_server)
in
let*! () = Event.(emit node_is_ready) () in
let* () = catch_up state in
let rec loop () =
let* () =
Lwt.catch
(fun () ->
let* block_stream, interupt =
connect ~delay:reconnection_delay cctxt
in
let*! () =
Lwt_stream.iter_s
(fun (head, header) ->
let*! r = process_head state (head, Some header) in
match r with
| Ok _ -> Lwt.return ()
| Error trace when is_connection_error trace ->
Format.eprintf
"@[<v 2>Connection error:@ %a@]@."
pp_print_trace
trace ;
interupt () ;
Lwt.return ()
| Error e ->
Format.eprintf "%a@.Exiting.@." pp_print_trace e ;
Lwt_exit.exit_and_raise 1)
block_stream
in
let*! () = Event.(emit connection_lost) () in
loop ())
fail_with_exn
in
Lwt_utils.never_ending ()
in
loop ()