Raw File
requester.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2021 Nomadic Labs, <contact@nomadic-labs.com>               *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

module type REQUESTER = sig
  type t

  type key

  type value

  type param

  val known : t -> key -> bool Lwt.t

  type error += Missing_data of key

  type error += Canceled of key

  type error += Timeout of key

  val read : t -> key -> value tzresult Lwt.t

  val read_opt : t -> key -> value option Lwt.t

  val inject : t -> key -> value -> bool Lwt.t

  val fetch :
    t ->
    ?peer:P2p_peer.Id.t ->
    ?timeout:Time.System.Span.t ->
    key ->
    param ->
    value tzresult Lwt.t

  val clear_or_cancel : t -> key -> unit
end

module type FULL_REQUESTER = sig
  include REQUESTER

  type store

  type request_param

  type notified_value

  val pending : t -> key -> bool

  val watch : t -> (key * value) Lwt_stream.t * Lwt_watcher.stopper

  val notify : t -> P2p_peer.Id.t -> key -> notified_value -> unit Lwt.t

  val memory_table_length : t -> int

  val pending_requests : t -> int

  val create :
    ?random_table:bool ->
    ?global_input:(key * value) Lwt_watcher.input ->
    request_param ->
    store ->
    t

  val shutdown : t -> unit Lwt.t
end

module type DISK_TABLE = sig
  type store

  type key

  type value

  val known : store -> key -> bool Lwt.t

  val read : store -> key -> value tzresult Lwt.t

  val read_opt : store -> key -> value option Lwt.t
end

module type MEMORY_TABLE = sig
  type 'a t

  type key

  val create : entry_type:string -> ?random:bool -> int -> 'a t

  val find : 'a t -> key -> 'a option

  val add : 'a t -> key -> 'a -> unit

  val replace : 'a t -> key -> 'a -> unit

  val remove : 'a t -> key -> unit

  val fold : (key -> 'a -> 'b -> 'b) -> 'a t -> 'b -> 'b

  val length : 'a t -> int
end

module type SCHEDULER = sig
  type t

  type key

  type param

  val request : t -> P2p_peer.Id.t option -> key -> unit

  val notify : t -> P2p_peer.Id.t -> key -> unit Lwt.t

  val notify_cancellation : t -> key -> unit

  val notify_unrequested : t -> P2p_peer.Id.t -> key -> unit Lwt.t

  val notify_duplicate : t -> P2p_peer.Id.t -> key -> unit Lwt.t

  val notify_invalid : t -> P2p_peer.Id.t -> key -> unit Lwt.t

  val pending_requests : t -> int

  val create : param -> t

  val shutdown : t -> unit Lwt.t
end

module type PROBE = sig
  type key

  type param

  type notified_value

  type value

  val probe : key -> param -> notified_value -> value option
end

module type REQUEST = sig
  type key

  type param

  val initial_delay : Time.System.Span.t

  val active : param -> P2p_peer.Set.t

  val send : param -> P2p_peer.Id.t -> key list -> unit
end

module type HASH = sig
  type t

  val name : string

  val encoding : t Data_encoding.t

  val pp : Format.formatter -> t -> unit
end

(** The requester uses a generic scheduler to schedule its requests.
    The [Memory_table] must be shared between the scheduler and the requester
    as it is used to store both pending requests and found values. *)

module Make_request_scheduler
    (Hash : HASH)
    (Table : MEMORY_TABLE with type key := Hash.t)
    (Request : REQUEST with type key := Hash.t) : sig
  include SCHEDULER with type key := Hash.t and type param := Request.param
end = struct
  module Events = Requester_event.Make (Hash)

  type key = Hash.t

  type t = {
    param : Request.param;
    pending : status Table.t;
    mutable min_next_request : Time.System.t option;
    (* The time of the next pending request to timeout. *)
    queue : event Lwt_pipe.Unbounded.t;
    mutable events : event list Lwt.t;
    canceler : Lwt_canceler.t;
    mutable worker : unit Lwt.t;
  }

  and status = {
    peers : P2p_peer.Set.t;
    next_request : Time.System.t;
    delay : Time.System.Span.t;
  }

  and event =
    | Request of P2p_peer.Id.t option * key
    | Notify of P2p_peer.Id.t * key
    | Notify_cancellation of key
    | Notify_invalid of P2p_peer.Id.t * key
    | Notify_duplicate of P2p_peer.Id.t * key
    | Notify_unrequested of P2p_peer.Id.t * key

  let request t p k = Lwt_pipe.Unbounded.push t.queue (Request (p, k))

  let notify t p k =
    let open Lwt_syntax in
    let* () = Events.(emit notify_push) (k, p) in
    Lwt_pipe.Unbounded.push t.queue (Notify (p, k)) ;
    Lwt.return ()

  (* [notify_cancellation] is used within non-Lwt context and needs to
     perform logging without yielding. We use
     [emit__dont_wait__use_with_care] to that end. Other events are used
     within Lwt context so we use the recommended [emit] for them. *)
  let notify_cancellation t k =
    Events.(emit__dont_wait__use_with_care notify_push_cancellation) k ;
    Lwt_pipe.Unbounded.push t.queue (Notify_cancellation k)

  let notify_invalid t p k =
    let open Lwt_syntax in
    let* () = Events.(emit notify_push_invalid) (k, p) in
    Lwt_pipe.Unbounded.push t.queue (Notify_invalid (p, k)) ;
    Lwt.return ()

  let notify_duplicate t p k =
    let open Lwt_syntax in
    let* () = Events.(emit notify_push_duplicate) (k, p) in
    Lwt_pipe.Unbounded.push t.queue (Notify_duplicate (p, k)) ;
    Lwt.return ()

  let notify_unrequested t p k =
    let open Lwt_syntax in
    let* () = Events.(emit notify_push_unrequested) (k, p) in
    Lwt_pipe.Unbounded.push t.queue (Notify_unrequested (p, k)) ;
    Lwt.return ()

  let compute_timeout state =
    match state.min_next_request with
    | None -> fst @@ Lwt.task ()
    | Some next ->
        let now = Time.System.now () in
        let delay = Ptime.diff next now in
        if Ptime.Span.compare delay Ptime.Span.zero <= 0 then Lwt.return_unit
        else Systime_os.sleep delay

  let process_event state now =
    let open Lwt_syntax in
    function
    | Request (peer, key) -> (
        let* () = Events.(emit registering_request) (key, peer) in
        match Table.find state.pending key with
        | Some data ->
            let peers =
              match peer with
              | None -> data.peers
              | Some peer -> P2p_peer.Set.add peer data.peers
            in
            Table.replace
              state.pending
              key
              {peers; next_request = now; delay = Request.initial_delay} ;
            Events.(emit registering_request_replaced) (key, peer)
        | None ->
            let peers =
              match peer with
              | None -> P2p_peer.Set.empty
              | Some peer -> P2p_peer.Set.singleton peer
            in
            Table.replace
              state.pending
              key
              {peers; next_request = now; delay = Request.initial_delay} ;
            Events.(emit registering_request_added) (key, peer))
    | Notify (peer, key) ->
        Table.remove state.pending key ;
        Events.(emit notify_received) (key, peer)
    | Notify_cancellation key ->
        Table.remove state.pending key ;
        Events.(emit notify_cancelled) key
    | Notify_invalid (peer, key) ->
        (* TODO: Punish peer *)
        Events.(emit notify_invalid) (key, peer)
    | Notify_unrequested (peer, key) ->
        (* TODO: Punish peer *)
        Events.(emit notify_unrequested) (key, peer)
    | Notify_duplicate (peer, key) ->
        (* TODO: Punish peer *)
        Events.(emit notify_duplicate) (key, peer)

  type update_table_action =
    | Replace of {key : key; status : status}
    | Remove of {key : key}

  let worker_loop state =
    let open Lwt_syntax in
    let shutdown = Lwt_canceler.when_canceling state.canceler in
    let rec loop state =
      (* It is possible that numerous pending requests may be canceled
         sequentially. If this occurs, we will recalculate the
         subsequent timeout for each cancellation. Calculating the
         next timeout could be resource-intensive. By allowing for a
         brief sleep, multiple cancellations can take place
         simultaneously.

         Note: using `Lwt.pause` or `Lwt.yield` might not be
         sufficient, e.g., when the scheduler does not timeout
         cancelers are not given a chance to be executed.

         Note: This constant was selected using the sophisticated
         "damp digit" method. *)
      let* () = Lwt_unix.sleep 0.001 in
      let timeout = compute_timeout state in
      let* () =
        Lwt.choose
          [
            (let* _ = state.events in
             Lwt.return_unit);
            timeout;
            shutdown;
          ]
      in
      if Lwt.state shutdown <> Lwt.Sleep then Events.(emit terminated) ()
      else if Lwt.state state.events <> Lwt.Sleep then (
        let now = Time.System.now () in
        let* events = state.events in
        state.events <- Lwt_pipe.Unbounded.pop_all state.queue ;
        let* () = List.iter_s (process_event state now) events in
        (* Requests are either added or deleted: either way, we need
           to go through the table to update the timeout. Setting it
           to now do just that. As a consequence, of that, the next
           call to `compute_timeout` will always return
           instantaneously. *)
        state.min_next_request <- Some now ;
        loop state)
      else
        let* () = Events.(emit timeout) () in
        let now = Time.System.now () in
        let active_peers = Request.active state.param in
        let compute_new_min_next_request min_next_request next_request =
          match min_next_request with
          | None -> Some next_request
          | Some min_next_request' ->
              if Ptime.is_earlier min_next_request' ~than:next_request then
                min_next_request
              else Some next_request
        in
        let actions, min_next_request, requests =
          Table.fold
            (fun key
                 {peers; next_request; delay}
                 (actions, min_next_request, acc) ->
              if Ptime.is_later next_request ~than:now then
                ( actions,
                  compute_new_min_next_request min_next_request next_request,
                  acc )
              else
                let remaining_peers = P2p_peer.Set.inter peers active_peers in
                if
                  P2p_peer.Set.is_empty remaining_peers
                  && not (P2p_peer.Set.is_empty peers)
                then (Remove {key} :: actions, min_next_request, acc)
                else
                  let requested_peer =
                    P2p_peer.Id.Set.random_elt
                      (if P2p_peer.Set.is_empty remaining_peers then
                       active_peers
                      else remaining_peers)
                  in
                  let next_request =
                    Option.value ~default:Ptime.max (Ptime.add_span now delay)
                  in
                  let next =
                    {
                      peers = remaining_peers;
                      next_request;
                      delay = Time.System.Span.multiply_exn 1.5 delay;
                    }
                  in
                  let new_acc =
                    P2p_peer.Map.update
                      requested_peer
                      (function
                        | None -> Some [key] | Some l -> Some (key :: l))
                      acc
                  in
                  ( Replace {key; status = next} :: actions,
                    compute_new_min_next_request min_next_request next_request,
                    new_acc ))
            state.pending
            ([], None, P2p_peer.Map.empty)
        in
        (* Update pending table *)
        List.iter
          (function
            | Remove {key} -> Table.remove state.pending key
            | Replace {key; status} -> Table.replace state.pending key status)
          actions ;
        state.min_next_request <- min_next_request ;
        P2p_peer.Map.iter (Request.send state.param) requests ;
        let* () =
          P2p_peer.Map.iter_s
            (fun peer request ->
              List.iter_s
                (fun (key : key) -> Events.(emit requested) (key, peer))
                request)
            requests
        in
        loop state
    in
    loop state

  let create param =
    let state =
      {
        param;
        queue = Lwt_pipe.Unbounded.create ();
        min_next_request = None;
        pending = Table.create ~entry_type:"pending_requests" ~random:true 17;
        events = Lwt.return_nil;
        canceler = Lwt_canceler.create ();
        worker = Lwt.return_unit;
      }
    in
    state.worker <-
      Lwt_utils.worker
        "db_request_scheduler"
        ~on_event:Internal_event.Lwt_worker_logger.on_event
        ~run:(fun () -> worker_loop state)
        ~cancel:(fun () -> Error_monad.cancel_with_exceptions state.canceler) ;
    state

  let shutdown s = Error_monad.cancel_with_exceptions s.canceler

  let pending_requests s = Table.length s.pending
end

module Make
    (Hash : HASH)
    (Disk_table : DISK_TABLE with type key := Hash.t)
    (Memory_table : MEMORY_TABLE with type key := Hash.t)
    (Request : REQUEST with type key := Hash.t)
    (Probe : PROBE with type key := Hash.t and type value := Disk_table.value) :
  FULL_REQUESTER
    with type key = Hash.t
     and type value = Disk_table.value
     and type param = Probe.param
     and type request_param = Request.param
     and type notified_value = Probe.notified_value
     and type store = Disk_table.store = struct
  type key = Hash.t

  type value = Disk_table.value

  type param = Probe.param

  type request_param = Request.param

  type notified_value = Probe.notified_value

  type store = Disk_table.store

  module Scheduler = Make_request_scheduler (Hash) (Memory_table) (Request)

  type t = {
    scheduler : Scheduler.t;
    disk : Disk_table.store;
    memory : status Memory_table.t;
    global_input : (key * value) Lwt_watcher.input option;
    input : (key * value) Lwt_watcher.input;
  }

  and status =
    | Pending of {
        waiter : value tzresult Lwt.t;
        wakener : value tzresult Lwt.u;
        mutable waiters : int;
        param : param;
      }
    | Found of value

  let known s k =
    match Memory_table.find s.memory k with
    | None -> Disk_table.known s.disk k
    | Some (Pending _) -> Lwt.return_false
    | Some (Found _) -> Lwt.return_true

  let read_opt s k =
    match Memory_table.find s.memory k with
    | None -> Disk_table.read_opt s.disk k
    | Some (Found v) -> Lwt.return_some v
    | Some (Pending _) -> Lwt.return_none

  type error += Missing_data of key

  type error += Canceled of key

  type error += Timeout of key

  let () =
    (* Missing data key *)
    register_error_kind
      `Permanent
      ~id:("requester." ^ Hash.name ^ ".missing")
      ~title:("Missing " ^ Hash.name)
      ~description:("Some " ^ Hash.name ^ " is missing from the requester")
      ~pp:(fun ppf key ->
        Format.fprintf ppf "Missing %s %a" Hash.name Hash.pp key)
      (Data_encoding.obj1 (Data_encoding.req "key" Hash.encoding))
      (function Missing_data key -> Some key | _ -> None)
      (fun key -> Missing_data key) ;
    (* Canceled key *)
    register_error_kind
      `Permanent
      ~title:("Canceled fetch of a " ^ Hash.name)
      ~description:("The fetch of a " ^ Hash.name ^ " has been canceled")
      ~id:("requester." ^ Hash.name ^ ".fetch_canceled")
      ~pp:(fun ppf key ->
        Format.fprintf ppf "Fetch of %s %a canceled" Hash.name Hash.pp key)
      Data_encoding.(obj1 (req "key" Hash.encoding))
      (function Canceled key -> Some key | _ -> None)
      (fun key -> Canceled key) ;
    (* Timeout key *)
    register_error_kind
      `Permanent
      ~title:("Timed out fetch of a " ^ Hash.name)
      ~description:("The fetch of a " ^ Hash.name ^ " has timed out")
      ~id:("requester." ^ Hash.name ^ ".fetch_timeout")
      ~pp:(fun ppf key ->
        Format.fprintf ppf "Fetch of %s %a timed out" Hash.name Hash.pp key)
      Data_encoding.(obj1 (req "key" Hash.encoding))
      (function Timeout key -> Some key | _ -> None)
      (fun key -> Timeout key)

  let read s k =
    let open Lwt_result_syntax in
    match Memory_table.find s.memory k with
    | None -> trace (Missing_data k) @@ Disk_table.read s.disk k
    | Some (Found v) -> return v
    | Some (Pending _) -> tzfail (Missing_data k)

  let wrap s k ?timeout t =
    let open Lwt_syntax in
    let t = Lwt.protected t in
    Lwt.on_cancel t (fun () ->
        match Memory_table.find s.memory k with
        | None -> ()
        | Some (Found _) -> ()
        | Some (Pending ({wakener = w; _} as data)) ->
            data.waiters <- data.waiters - 1 ;
            if data.waiters = 0 then (
              Memory_table.remove s.memory k ;
              Scheduler.notify_cancellation s.scheduler k ;
              Lwt.wakeup_later w (Result_syntax.tzfail (Canceled k)))) ;
    match timeout with
    | None -> t
    | Some delay ->
        let timeout =
          let* () = Systime_os.sleep delay in
          Lwt_result_syntax.tzfail (Timeout k)
        in
        Lwt.pick [t; timeout]

  let fetch s ?peer ?timeout k param =
    let open Lwt_syntax in
    match Memory_table.find s.memory k with
    | None -> (
        let* o = Disk_table.read_opt s.disk k in
        match o with
        | Some v -> return_ok v
        | None -> (
            (* It is necessary to check the memory-table again in case another
               promise has altered it whilst this one was waiting for the
               disk-table query. *)
            match Memory_table.find s.memory k with
            | None ->
                let waiter, wakener = Lwt.wait () in
                Memory_table.add
                  s.memory
                  k
                  (Pending {waiter; wakener; waiters = 1; param}) ;
                Scheduler.request s.scheduler peer k ;
                wrap s k ?timeout waiter
            | Some (Pending data) ->
                Scheduler.request s.scheduler peer k ;
                data.waiters <- data.waiters + 1 ;
                wrap s k ?timeout data.waiter
            | Some (Found v) -> return_ok v))
    | Some (Pending data) ->
        Scheduler.request s.scheduler peer k ;
        data.waiters <- data.waiters + 1 ;
        wrap s k ?timeout data.waiter
    | Some (Found v) -> return_ok v

  let notify_when_pending s p k w param v =
    let open Lwt_syntax in
    match Probe.probe k param v with
    | None -> Scheduler.notify_invalid s.scheduler p k
    | Some v ->
        let* () = Scheduler.notify s.scheduler p k in
        Memory_table.replace s.memory k (Found v) ;
        Lwt.wakeup_later w (Ok v) ;
        Option.iter
          (fun input -> Lwt_watcher.notify input (k, v))
          s.global_input ;
        Lwt_watcher.notify s.input (k, v) ;
        Lwt.return_unit

  let notify s p k v =
    let open Lwt_syntax in
    match Memory_table.find s.memory k with
    | None -> (
        let* b = Disk_table.known s.disk k in
        match b with
        | true -> Scheduler.notify_duplicate s.scheduler p k
        | false -> (
            (* It is necessary to check the memory-table again in case another
               promise has altered it whilst this one was waiting for the
               disk-table query. *)
            match Memory_table.find s.memory k with
            | None -> Scheduler.notify_unrequested s.scheduler p k
            | Some (Pending {wakener = w; param; _}) ->
                notify_when_pending s p k w param v
            | Some (Found _) -> Scheduler.notify_duplicate s.scheduler p k))
    | Some (Pending {wakener = w; param; _}) ->
        notify_when_pending s p k w param v
    | Some (Found _) -> Scheduler.notify_duplicate s.scheduler p k

  let inject s k v =
    let open Lwt_syntax in
    match Memory_table.find s.memory k with
    | None -> (
        let* b = Disk_table.known s.disk k in
        match b with
        | true -> Lwt.return_false
        | false -> (
            (* It is necessary to check the memory-table again in case another
               promise has altered it whilst this one was waiting for the
               disk-table query. *)
            match Memory_table.find s.memory k with
            | None ->
                Memory_table.add s.memory k (Found v) ;
                Lwt.return_true
            | Some (Pending _) | Some (Found _) -> Lwt.return_false))
    | Some (Pending _) | Some (Found _) -> Lwt.return_false

  let clear_or_cancel s k =
    match Memory_table.find s.memory k with
    | None -> ()
    | Some (Pending status) ->
        if status.waiters <= 1 then (
          Scheduler.notify_cancellation s.scheduler k ;
          Memory_table.remove s.memory k ;
          Lwt.wakeup_later status.wakener (Result_syntax.tzfail (Canceled k)))
        else status.waiters <- status.waiters - 1
    | Some (Found _) -> Memory_table.remove s.memory k

  let watch s = Lwt_watcher.create_stream s.input

  let create ?random_table:random ?global_input request_param disk =
    let scheduler = Scheduler.create request_param in
    let memory = Memory_table.create ~entry_type:"entries" ?random 17 in
    let input = Lwt_watcher.create_input () in
    {scheduler; disk; memory; input; global_input}

  let pending s k =
    match Memory_table.find s.memory k with
    | None -> false
    | Some (Found _) -> false
    | Some (Pending _) -> true

  let memory_table_length s = Memory_table.length s.memory

  let pending_requests s = Scheduler.pending_requests s.scheduler

  let shutdown s = Scheduler.shutdown s.scheduler
end
back to top