Revision e8af0329db79f45c4667a5eebfa67ac36687c954 authored by Sebastien Mondet on 26 November 2019, 22:09:35 UTC, committed by Benjamin Canou on 30 December 2019, 16:03:01 UTC
1 parent eaba490
Raw File
mempool_worker.ml
(*****************************************************************************)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2018 Nomadic Labs, <contact@nomadic-labs.com>               *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

type limits = {worker_limits : Worker_types.limits}

module type T = sig
  module Proto : Registered_protocol.T

  type t

  type operation = private {
    hash : Operation_hash.t;
    raw : Operation.t;
    protocol_data : Proto.operation_data;
  }

  type result =
    | Applied of Proto.operation_receipt
    | Branch_delayed of error list
    | Branch_refused of error list
    | Refused of error list
    | Duplicate
    | Not_in_branch

  val result_encoding : result Data_encoding.t

  (** Creates/tear-down a new mempool validator context. *)
  val create : limits -> Distributed_db.chain_db -> t tzresult Lwt.t

  val shutdown : t -> unit Lwt.t

  (** parse a new operation and add it to the mempool context *)
  val parse : Operation.t -> operation tzresult

  (** validate a new operation and add it to the mempool context *)
  val validate : t -> operation -> result tzresult Lwt.t

  val chain_db : t -> Distributed_db.chain_db

  val rpc_directory : t RPC_directory.t
end

module type STATIC = sig
  val max_size_parsed_cache : int
end

module Make (Static : STATIC) (Proto : Registered_protocol.T) :
  T with module Proto = Proto = struct
  module Proto = Proto

  (* used for rpc *)
  module Proto_services = Block_services.Make (Proto) (Proto)

  type operation = {
    hash : Operation_hash.t;
    raw : Operation.t;
    protocol_data : Proto.operation_data;
  }

  type result =
    | Applied of Proto.operation_receipt
    | Branch_delayed of error list
    | Branch_refused of error list
    | Refused of error list
    | Duplicate
    | Not_in_branch

  let result_encoding =
    let open Data_encoding in
    union
      [ case
          (Tag 0)
          ~title:"Applied"
          (obj1 (req "receipt" Proto.operation_receipt_encoding))
          (function Applied receipt -> Some receipt | _ -> None)
          (fun receipt -> Applied receipt);
        case
          (Tag 1)
          ~title:"Branch Delayed"
          (obj1 (req "error" (list Error_monad.error_encoding)))
          (function Branch_delayed error -> Some error | _ -> None)
          (fun error -> Branch_delayed error);
        case
          (Tag 2)
          ~title:"Branch Refused"
          (obj1 (req "error" (list Error_monad.error_encoding)))
          (function Branch_refused error -> Some error | _ -> None)
          (fun error -> Branch_refused error);
        case
          (Tag 3)
          ~title:"Refused"
          (obj1 (req "error" (list Error_monad.error_encoding)))
          (function Refused error -> Some error | _ -> None)
          (fun error -> Refused error);
        case
          (Tag 4)
          ~title:"Duplicate"
          empty
          (function Duplicate -> Some () | _ -> None)
          (fun () -> Duplicate);
        case
          (Tag 5)
          ~title:"Not_in_branch"
          empty
          (function Not_in_branch -> Some () | _ -> None)
          (fun () -> Not_in_branch) ]

  let pp_result ppf = function
    | Applied _ ->
        Format.pp_print_string ppf "applied"
    | Branch_delayed _ ->
        Format.pp_print_string ppf "branch delayed"
    | Branch_refused _ ->
        Format.pp_print_string ppf "branch refused"
    | Refused _ ->
        Format.pp_print_string ppf "refused"
    | Duplicate ->
        Format.pp_print_string ppf "duplicate"
    | Not_in_branch ->
        Format.pp_print_string ppf "not in branch"

  let operation_encoding =
    let open Data_encoding in
    conv
      (fun {hash; raw; protocol_data} -> (hash, raw, protocol_data))
      (fun (hash, raw, protocol_data) -> {hash; raw; protocol_data})
      (obj3
         (req "hash" Operation_hash.encoding)
         (req "raw" Operation.encoding)
         (req "protocol_data" Proto.operation_data_encoding))

  module Log = Internal_event.Legacy_logging.Make (struct
    let name = "node.mempool_validator"
  end)

  module Name = struct
    type t = Chain_id.t

    let encoding = Chain_id.encoding

    let base =
      let proto_hash =
        let (_ : string) = Format.flush_str_formatter () in
        Format.fprintf Format.str_formatter "%a" Protocol_hash.pp Proto.hash ;
        Format.flush_str_formatter ()
      in
      ["node"; "mempool"; "worker"; proto_hash]

    let pp = Chain_id.pp_short
  end

  module Request = struct
    type 'a t = Validate : operation -> result t [@@ocaml.unboxed]

    type view = View : _ t -> view

    let view req = View req

    let encoding =
      let open Data_encoding in
      conv
        (fun (View (Validate op)) -> op)
        (fun op -> View (Validate op))
        operation_encoding

    let pp ppf (View (Validate {hash; _})) =
      Format.fprintf ppf "Validating new operation %a" Operation_hash.pp hash
  end

  module Event = struct
    type t =
      | Request of
          (Request.view * Worker_types.request_status * error list option)
      | Debug of string

    let level req =
      match req with
      | Debug _ ->
          Internal_event.Debug
      | Request _ ->
          Internal_event.Info

    let encoding =
      let open Data_encoding in
      union
        [ case
            (Tag 0)
            ~title:"Debug"
            (obj1 (req "message" string))
            (function Debug msg -> Some msg | _ -> None)
            (fun msg -> Debug msg);
          case
            (Tag 1)
            ~title:"Request"
            (obj2
               (req "request" Request.encoding)
               (req "status" Worker_types.request_status_encoding))
            (function Request (req, t, None) -> Some (req, t) | _ -> None)
            (fun (req, t) -> Request (req, t, None));
          case
            (Tag 2)
            ~title:"Failed request"
            (obj3
               (req "error" RPC_error.encoding)
               (req "failed_request" Request.encoding)
               (req "status" Worker_types.request_status_encoding))
            (function
              | Request (req, t, Some errs) -> Some (errs, req, t) | _ -> None)
            (fun (errs, req, t) -> Request (req, t, Some errs)) ]

    let pp ppf = function
      | Debug msg ->
          Format.fprintf ppf "%s" msg
      | Request (view, {pushed; treated; completed}, None) ->
          Format.fprintf
            ppf
            "@[<v 0>%a@, %a@]"
            Request.pp
            view
            Worker_types.pp_status
            {pushed; treated; completed}
      | Request (view, {pushed; treated; completed}, Some errors) ->
          Format.fprintf
            ppf
            "@[<v 0>%a@, %a, %a@]"
            Request.pp
            view
            Worker_types.pp_status
            {pushed; treated; completed}
            (Format.pp_print_list Error_monad.pp)
            errors
  end

  (* parsed operations' cache. used for memoization *)
  module ParsedCache = struct
    type t = {
      table : operation tzresult Operation_hash.Table.t;
      ring : Operation_hash.t Ring.t;
    }

    let create () : t =
      {
        table = Operation_hash.Table.create Static.max_size_parsed_cache;
        ring = Ring.create Static.max_size_parsed_cache;
      }

    let add t raw_op parsed_op =
      let hash = Operation.hash raw_op in
      Option.iter
        ~f:(Operation_hash.Table.remove t.table)
        (Ring.add_and_return_erased t.ring hash) ;
      Operation_hash.Table.replace t.table hash parsed_op

    let find_opt t raw_op =
      let hash = Operation.hash raw_op in
      Operation_hash.Table.find_opt t.table hash

    let rem t hash =
      (* NOTE: hashes are not removed from the ring. As a result, the cache size
       * bound can be lowered. This is a non-issue because it's only a cache. *)
      Operation_hash.Table.remove t.table hash
  end

  (* validated operations' cache. used for memoization *)
  module ValidatedCache = struct
    type t = (result * Operation.t) Operation_hash.Table.t

    let encoding =
      let open Data_encoding in
      Operation_hash.Table.encoding (tup2 result_encoding Operation.encoding)

    let pp break ppf table =
      let open Format in
      Operation_hash.Table.iter
        (fun h (r, _) ->
          fprintf ppf "Operation %a: %a" Operation_hash.pp_short h pp_result r ;
          break ppf)
        table

    let create () = Operation_hash.Table.create 1000

    let add t parsed_op result =
      Operation_hash.Table.replace t parsed_op.hash result

    let find_opt t parsed_op = Operation_hash.Table.find_opt t parsed_op.hash

    let iter f t = Operation_hash.Table.iter f t

    let to_mempool t =
      let empty =
        {
          Proto_services.Mempool.applied = [];
          refused = Operation_hash.Map.empty;
          branch_refused = Operation_hash.Map.empty;
          branch_delayed = Operation_hash.Map.empty;
          unprocessed = Operation_hash.Map.empty;
        }
      in
      let map_op op =
        let protocol_data =
          Data_encoding.Binary.of_bytes_exn
            Proto.operation_data_encoding
            op.Operation.proto
        in
        {Proto.shell = op.shell; protocol_data}
      in
      Operation_hash.Table.fold
        (fun hash (result, raw_op) acc ->
          let proto_op = map_op raw_op in
          match result with
          | Applied _ ->
              {
                acc with
                Proto_services.Mempool.applied =
                  (hash, proto_op) :: acc.Proto_services.Mempool.applied;
              }
          | Branch_refused err ->
              {
                acc with
                Proto_services.Mempool.branch_refused =
                  Operation_hash.Map.add
                    hash
                    (proto_op, err)
                    acc.Proto_services.Mempool.branch_refused;
              }
          | Branch_delayed err ->
              {
                acc with
                Proto_services.Mempool.branch_delayed =
                  Operation_hash.Map.add
                    hash
                    (proto_op, err)
                    acc.Proto_services.Mempool.branch_delayed;
              }
          | Refused err ->
              {
                acc with
                Proto_services.Mempool.refused =
                  Operation_hash.Map.add
                    hash
                    (proto_op, err)
                    acc.Proto_services.Mempool.refused;
              }
          | _ ->
              acc)
        t
        empty

    let clear t = Operation_hash.Table.clear t
  end

  module Types = struct
    type parameters = {
      limits : limits;
      chain_db : Distributed_db.chain_db;
      validation_state : Proto.validation_state;
    }

    (* internal worker state *)
    type state = {
      (* state of the validator. this is updated at each apply_operation *)
      mutable validation_state : Proto.validation_state;
      cache : ValidatedCache.t;
      (* live blocks and operations, initialized at worker launch *)
      live_blocks : Block_hash.Set.t;
      live_operations : Operation_hash.Set.t;
      operation_stream :
        (result * Operation.shell_header * Proto.operation_data)
        Lwt_watcher.input;
      parameters : parameters;
    }

    type view = {cache : ValidatedCache.t}

    let view (state : state) _ : view = {cache = state.cache}

    let encoding =
      let open Data_encoding in
      conv
        (fun {cache} -> cache)
        (fun cache -> {cache})
        ValidatedCache.encoding

    let pp ppf {cache} =
      ValidatedCache.pp
        (fun ppf ->
          Format.pp_print_string ppf ";" ;
          Format.pp_print_space ppf ())
        ppf
        cache
  end

  module Logger = Worker_logger.Make (Event) (Request)
  module Worker = Worker.Make (Name) (Event) (Request) (Types) (Logger)
  open Types

  type t = Worker.infinite Worker.queue Worker.t

  let parsed_cache = ParsedCache.create ()

  let shutdown w = Worker.shutdown w

  (*** prevalidation ****)
  open Validation_errors

  let create ?protocol_data ~predecessor ~timestamp () =
    let { Block_header.shell =
            { fitness = predecessor_fitness;
              timestamp = predecessor_timestamp;
              level = predecessor_level;
              _ };
          _ } =
      State.Block.header predecessor
    in
    State.Block.context predecessor
    >>=? fun predecessor_context ->
    let predecessor_hash = State.Block.hash predecessor in
    ( match protocol_data with
    | None ->
        return_none
    | Some protocol_data -> (
      match
        Data_encoding.Binary.of_bytes
          Proto.block_header_data_encoding
          protocol_data
      with
      | None ->
          failwith "Invalid block header"
      | Some protocol_data ->
          return_some protocol_data ) )
    >>=? fun protocol_data ->
    let predecessor_context =
      Shell_context.wrap_disk_context predecessor_context
    in
    Proto.begin_construction
      ~chain_id:(State.Block.chain_id predecessor)
      ~predecessor_context
      ~predecessor_timestamp
      ~predecessor_fitness
      ~predecessor_level
      ~predecessor:predecessor_hash
      ~timestamp
      ?protocol_data
      ()

  let apply_operation state op =
    if Operation_hash.Set.mem op.hash state.live_operations then
      Lwt.return (None, Duplicate)
    else if
      not (Block_hash.Set.mem op.raw.Operation.shell.branch state.live_blocks)
    then Lwt.return (None, Not_in_branch)
    else
      Proto.apply_operation
        state.validation_state
        {shell = op.raw.shell; protocol_data = op.protocol_data}
      >|= function
      | Ok (validation_state, receipt) ->
          (Some validation_state, Applied receipt)
      | Error errors -> (
          ( None,
            match classify_errors errors with
            | `Branch ->
                Branch_refused errors
            | `Permanent ->
                Refused errors
            | `Temporary ->
                Branch_delayed errors ) )

  (*** end prevalidation ***)

  let parse_helper raw_op =
    let hash = Operation.hash raw_op in
    let size = Data_encoding.Binary.length Operation.encoding raw_op in
    if size > Proto.max_operation_data_length then
      error (Oversized_operation {size; max = Proto.max_operation_data_length})
    else
      match
        Data_encoding.Binary.of_bytes
          Proto.operation_data_encoding
          raw_op.Operation.proto
      with
      | None ->
          error Parse_error
      | Some protocol_data ->
          ok {hash; raw = raw_op; protocol_data}

  (* this function update the internal state of the worker *)
  let validate_helper w parsed_op =
    let state = Worker.state w in
    apply_operation state parsed_op
    >>= fun (validation_state, result) ->
    ( match validation_state with
    | Some validation_state ->
        state.validation_state <- validation_state
    | None ->
        () ) ;
    Lwt.return result

  let notify_helper w result {Operation.shell; proto} =
    let state = Worker.state w in
    (* this function is called by on_validate where we take care of the error *)
    let protocol_data =
      Data_encoding.Binary.of_bytes_exn Proto.operation_data_encoding proto
    in
    Lwt_watcher.notify state.operation_stream (result, shell, protocol_data)

  (* memoization is done only at on_* level *)
  let on_validate w parsed_op =
    let state = Worker.state w in
    match ValidatedCache.find_opt state.cache parsed_op with
    | None | Some (Branch_delayed _, _) ->
        validate_helper w parsed_op
        >>= fun result ->
        ValidatedCache.add state.cache parsed_op (result, parsed_op.raw) ;
        (* operations are notified only the first time *)
        notify_helper w result parsed_op.raw ;
        Lwt.return result
    | Some (result, _) ->
        Lwt.return result

  (* worker's handlers *)
  let on_request : type r. t -> r Request.t -> r tzresult Lwt.t =
   fun w request ->
    match request with
    | Request.Validate parsed_op ->
        on_validate w parsed_op >>= return

  let on_launch (_ : t) (_ : Name.t)
      ({chain_db; validation_state; _} as parameters) =
    let chain_state = Distributed_db.chain_state chain_db in
    Chain.data chain_state
    >>= fun {current_mempool = _mempool; live_blocks; live_operations; _} ->
    (* remove all operations that are already included *)
    Operation_hash.Set.iter
      (fun hash -> ParsedCache.rem parsed_cache hash)
      live_operations ;
    return
      {
        validation_state;
        cache = ValidatedCache.create ();
        live_blocks;
        live_operations;
        operation_stream = Lwt_watcher.create_input ();
        parameters;
      }

  let on_close w =
    let state = Worker.state w in
    Lwt_watcher.shutdown_input state.operation_stream ;
    ValidatedCache.iter
      (fun hash _ ->
        Distributed_db.Operation.clear_or_cancel state.parameters.chain_db hash)
      state.cache ;
    ValidatedCache.clear state.cache ;
    Lwt.return_unit

  let on_error w r st errs =
    Worker.record_event w (Event.Request (r, st, Some errs)) ;
    Lwt.return_error errs

  let on_completion w r _ st =
    Worker.record_event w (Event.Request (Request.view r, st, None)) ;
    Lwt.return_unit

  let table = Worker.create_table Queue

  let create limits chain_db =
    let chain_state = Distributed_db.chain_state chain_db in
    let chain_id = State.Chain.id chain_state in
    let module Handlers = struct
      type self = t

      let on_launch = on_launch

      let on_close = on_close

      let on_error = on_error

      let on_completion = on_completion

      let on_no_request _ = return_unit

      let on_request = on_request
    end in
    Chain.data chain_state
    >>= fun {current_head = predecessor; _} ->
    let timestamp = Time.System.to_protocol (Systime_os.now ()) in
    create ~predecessor ~timestamp ()
    >>=? fun validation_state ->
    Worker.launch
      table
      limits.worker_limits
      chain_id
      {limits; chain_db; validation_state}
      (module Handlers)

  (* Exporting functions *)

  let validate t parsed_op =
    Worker.Queue.push_request_and_wait t (Request.Validate parsed_op)

  (* atomic parse + memoization *)
  let parse raw_op =
    match ParsedCache.find_opt parsed_cache raw_op with
    | None ->
        let parsed_op = parse_helper raw_op in
        ParsedCache.add parsed_cache raw_op parsed_op ;
        parsed_op
    | Some parsed_op ->
        parsed_op

  let chain_db t =
    let state = Worker.state t in
    state.parameters.chain_db

  let pending_rpc_directory : t RPC_directory.t =
    RPC_directory.gen_register
      RPC_directory.empty
      (Proto_services.S.Mempool.pending_operations RPC_path.open_root)
      (fun w () () ->
        let state = Worker.state w in
        RPC_answer.return (ValidatedCache.to_mempool state.cache))

  let monitor_rpc_directory : t RPC_directory.t =
    RPC_directory.gen_register
      RPC_directory.empty
      (Proto_services.S.Mempool.monitor_operations RPC_path.open_root)
      (fun w params () ->
        let state = Worker.state w in
        let filter_result = function
          | Applied _ ->
              params#applied
          | Refused _ ->
              params#refused
          | Branch_refused _ ->
              params#branch_refused
          | Branch_delayed _ ->
              params#branch_delayed
          | _ ->
              false
        in
        let (op_stream, stopper) =
          Lwt_watcher.create_stream state.operation_stream
        in
        let shutdown () = Lwt_watcher.shutdown stopper in
        let next () =
          Lwt_stream.get op_stream
          >>= function
          | Some (kind, shell, protocol_data) when filter_result kind ->
              Lwt.return_some [{Proto.shell; protocol_data}]
          | _ ->
              Lwt.return_none
        in
        RPC_answer.return_stream {next; shutdown})

  (* /mempool/<chain_id>/pending
     /mempool/<chain_id>/monitor *)
  let rpc_directory =
    RPC_directory.merge pending_rpc_directory monitor_rpc_directory
end
back to top