https://gitlab.com/tezos/tezos
Raw File
Tip revision: 1817e57bf0059e3ec6ceb1ebe2a02975492bfda5 authored by Ilias Garnier on 08 December 2021, 09:54:35 UTC
Proto,SCORU: Add dummy inbox implementation
Tip revision: 1817e57
worker.ml
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2018-2021 Nomadic Labs, <contact@nomadic-labs.com>          *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

(** An error returned when trying to communicate with a worker that
    has been closed.*)
type worker_name = {base : string; name : string}

type Error_monad.error += Closed of worker_name

let () =
  register_error_kind
    `Permanent
    ~id:"worker.closed"
    ~title:"Worker closed"
    ~description:
      "An operation on a worker could not complete before it was shut down."
    ~pp:(fun ppf w ->
      Format.fprintf ppf "Worker %s[%s] has been shut down." w.base w.name)
    Data_encoding.(
      conv
        (fun {base; name} -> (base, name))
        (fun (name, base) -> {base; name})
        (obj1 (req "worker" (tup2 string string))))
    (function Closed w -> Some w | _ -> None)
    (fun w -> Closed w)

module type T = sig
  module Name : Worker_intf.NAME

  module Event : Worker_intf.EVENT

  module Request : Worker_intf.REQUEST

  module Types : Worker_intf.TYPES

  (** A handle to a specific worker, parameterized by the type of
      internal message buffer. *)
  type 'kind t

  (** A handle to a table of workers. *)
  type 'kind table

  (** Internal buffer kinds used as parameters to {!t}. *)
  type 'a queue

  and bounded

  and infinite

  type dropbox

  (** Supported kinds of internal buffers. *)
  type _ buffer_kind =
    | Queue : infinite queue buffer_kind
    | Bounded : {size : int} -> bounded queue buffer_kind
    | Dropbox : {
        merge :
          dropbox t -> any_request -> any_request option -> any_request option;
      }
        -> dropbox buffer_kind

  and any_request = Any_request : _ Request.t -> any_request

  (** Create a table of workers. *)
  val create_table : 'kind buffer_kind -> 'kind table

  (** The callback handlers specific to each worker instance. *)
  module type HANDLERS = sig
    (** Placeholder replaced with {!t} with the right parameters
        provided by the type of buffer chosen at {!launch}.*)
    type self

    (** Builds the initial internal state of a worker at launch.
        It is possible to initialize the message queue.
        Of course calling {!state} will fail at that point. *)
    val on_launch :
      self -> Name.t -> Types.parameters -> Types.state tzresult Lwt.t

    (** The main request processor, i.e. the body of the event loop. *)
    val on_request : self -> 'a Request.t -> 'a tzresult Lwt.t

    (** Called when no request has been made before the timeout, if
        the parameter has been passed to {!launch}. *)
    val on_no_request : self -> unit tzresult Lwt.t

    (** A function called when terminating a worker. *)
    val on_close : self -> unit Lwt.t

    (** A function called at the end of the worker loop in case of an
        abnormal error. This function can handle the error by
        returning [Ok ()], or leave the default unexpected error
        behaviour by returning its parameter. A possibility is to
        handle the error for ad-hoc logging, and still use
        {!trigger_shutdown} to kill the worker. *)
    val on_error :
      self ->
      Request.view ->
      Worker_types.request_status ->
      error list ->
      unit tzresult Lwt.t

    (** A function called at the end of the worker loop in case of a
        successful treatment of the current request. *)
    val on_completion :
      self -> 'a Request.t -> 'a -> Worker_types.request_status -> unit Lwt.t
  end

  (** Creates a new worker instance.
      Parameter [queue_size] not passed means unlimited queue. *)
  val launch :
    'kind table ->
    ?timeout:Time.System.Span.t ->
    Name.t ->
    Types.parameters ->
    (module HANDLERS with type self = 'kind t) ->
    'kind t tzresult Lwt.t

  (** Triggers a worker termination and waits for its completion.
      Cannot be called from within the handlers.  *)
  val shutdown : _ t -> unit Lwt.t

  module type BOX = sig
    type t

    val put_request : t -> 'a Request.t -> unit

    val put_request_and_wait : t -> 'a Request.t -> 'a tzresult Lwt.t
  end

  module type QUEUE = sig
    type 'a t

    val push_request_and_wait : 'q t -> 'a Request.t -> 'a tzresult Lwt.t

    val push_request : 'q t -> 'a Request.t -> unit Lwt.t

    val pending_requests : 'a t -> (Time.System.t * Request.view) list

    val pending_requests_length : 'a t -> int
  end

  module Dropbox : sig
    include BOX with type t := dropbox t
  end

  module Queue : sig
    include QUEUE with type 'a t := 'a queue t

    (** Adds a message to the queue immediately. *)
    val push_request_now : infinite queue t -> 'a Request.t -> unit
  end

  (** Detects cancellation from within the request handler to stop
      asynchronous operations. *)
  val protect :
    _ t ->
    ?on_error:(error list -> 'b tzresult Lwt.t) ->
    (unit -> 'b tzresult Lwt.t) ->
    'b tzresult Lwt.t

  (** Exports the canceler to allow cancellation of other tasks when this
      worker is shut down or when it dies. *)
  val canceler : _ t -> Lwt_canceler.t

  (** Triggers a worker termination. *)
  val trigger_shutdown : _ t -> unit

  (** Record an event in the backlog. *)
  val record_event : _ t -> Event.t -> unit

  (** Record an event and make sure it is logged. *)
  val log_event : _ t -> Event.t -> unit Lwt.t

  (** Access the internal state, once initialized. *)
  val state : _ t -> Types.state

  (** Introspect the message queue, gives the times requests were pushed. *)
  val pending_requests : _ queue t -> (Time.System.t * Request.view) list

  (** Get the running status of a worker. *)
  val status : _ t -> Worker_types.worker_status

  (** Get the request being treated by a worker.
      Gives the time the request was pushed, and the time its
      treatment started. *)
  val current_request :
    _ t -> (Time.System.t * Time.System.t * Request.view) option

  val information : _ t -> Worker_types.worker_information

  (** Lists the running workers in this group. *)
  val list : 'a table -> (Name.t * 'a t) list

  (** [find_opt table n] is [Some worker] if the [worker] is in the [table] and
      has name [n]. *)
  val find_opt : 'a table -> Name.t -> 'a t option
end

module Make
    (Name : Worker_intf.NAME)
    (Event : Worker_intf.EVENT)
    (Request : Worker_intf.REQUEST)
    (Types : Worker_intf.TYPES)
    (Logger : Worker_intf.LOGGER
                with module Event = Event
                 and type Request.view = Request.view) =
struct
  module Name = Name
  module Event = Event
  module Request = Request
  module Types = Types
  module Logger = Logger

  module Nametbl = Hashtbl.MakeSeeded (struct
    type t = Name.t

    let hash = Hashtbl.seeded_hash

    let equal = Name.equal
  end)

  let base_name = String.concat "-" Name.base

  type message = Message : 'a Request.t * 'a tzresult Lwt.u option -> message

  type 'a queue

  and bounded

  and infinite

  type dropbox

  type _ buffer_kind =
    | Queue : infinite queue buffer_kind
    | Bounded : {size : int} -> bounded queue buffer_kind
    | Dropbox : {
        merge :
          dropbox t -> any_request -> any_request option -> any_request option;
      }
        -> dropbox buffer_kind

  and any_request = Any_request : _ Request.t -> any_request

  and _ buffer =
    | Queue_buffer :
        (Time.System.t * message) Lwt_pipe.Unbounded.t
        -> infinite queue buffer
    | Bounded_buffer :
        (Time.System.t * message) Lwt_pipe.Bounded.t
        -> bounded queue buffer
    | Dropbox_buffer : (Time.System.t * message) Lwt_dropbox.t -> dropbox buffer

  and 'kind t = {
    timeout : Time.System.Span.t option;
    parameters : Types.parameters;
    mutable (* only for init *) worker : unit Lwt.t;
    mutable (* only for init *) state : Types.state option;
    buffer : 'kind buffer;
    canceler : Lwt_canceler.t;
    name : Name.t;
    id : int;
    mutable status : Worker_types.worker_status;
    mutable current_request :
      (Time.System.t * Time.System.t * Request.view) option;
    logEvent : (module Internal_event.EVENT with type t = Logger.t);
    table : 'kind table;
  }

  and 'kind table = {
    buffer_kind : 'kind buffer_kind;
    mutable last_id : int;
    instances : 'kind t Nametbl.t;
  }

  let queue_item ?u r = (Systime_os.now (), Message (r, u))

  let drop_request w merge message_box request =
    try
      match
        match Lwt_dropbox.peek message_box with
        | None -> merge w (Any_request request) None
        | Some (_, Message (old, _)) ->
            Lwt.ignore_result (Lwt_dropbox.take message_box) ;
            merge w (Any_request request) (Some (Any_request old))
      with
      | None -> ()
      | Some (Any_request neu) ->
          Lwt_dropbox.put message_box (Systime_os.now (), Message (neu, None))
    with Lwt_dropbox.Closed -> ()

  let drop_request_and_wait w message_box request =
    let (t, u) = Lwt.wait () in
    Lwt.catch
      (fun () ->
        Lwt_dropbox.put message_box (queue_item ~u request) ;
        t)
      (function
        | Lwt_dropbox.Closed ->
            let name = Format.asprintf "%a" Name.pp w.name in
            fail (Closed {base = base_name; name})
        | exn ->
            (* [Lwt_dropbox.put] can only raise [Closed] which is caught above.
               We don't want to catch any other exception but we cannot use an
               incomplete pattern like we would in a [try]-[with] construct so
               we must explicitly match and re-raise [exn]. *)
            raise exn)

  module type BOX = sig
    type t

    val put_request : t -> 'a Request.t -> unit

    val put_request_and_wait : t -> 'a Request.t -> 'a tzresult Lwt.t
  end

  module type QUEUE = sig
    type 'a t

    val push_request_and_wait : 'q t -> 'a Request.t -> 'a tzresult Lwt.t

    val push_request : 'q t -> 'a Request.t -> unit Lwt.t

    val pending_requests : 'a t -> (Time.System.t * Request.view) list

    val pending_requests_length : 'a t -> int
  end

  module Dropbox = struct
    let put_request (w : dropbox t) request =
      let (Dropbox {merge}) = w.table.buffer_kind in
      let (Dropbox_buffer message_box) = w.buffer in
      drop_request w merge message_box request

    let put_request_and_wait (w : dropbox t) request =
      let (Dropbox_buffer message_box) = w.buffer in
      drop_request_and_wait w message_box request
  end

  module Queue = struct
    let push_request (type a) (w : a queue t) request =
      match w.buffer with
      | Queue_buffer message_queue ->
          Lwt_pipe.Unbounded.push message_queue (queue_item request) ;
          (* because pushing on an unbounded pipe is immediate, we return within
             Lwt explicitly for compatibility with the other case *)
          Lwt.return_unit
      | Bounded_buffer message_queue ->
          Lwt_pipe.Bounded.push message_queue (queue_item request)

    let push_request_now (w : infinite queue t) request =
      let (Queue_buffer message_queue) = w.buffer in
      if Lwt_pipe.Unbounded.is_closed message_queue then ()
      else Lwt_pipe.Unbounded.push message_queue (queue_item request)

    let push_request_and_wait (type a) (w : a queue t) request =
      match w.buffer with
      | Queue_buffer message_queue -> (
          try
            let (t, u) = Lwt.wait () in
            Lwt_pipe.Unbounded.push message_queue (queue_item ~u request) ;
            t
          with Lwt_pipe.Closed ->
            let name = Format.asprintf "%a" Name.pp w.name in
            fail (Closed {base = base_name; name}))
      | Bounded_buffer message_queue ->
          let (t, u) = Lwt.wait () in
          Lwt.try_bind
            (fun () ->
              Lwt_pipe.Bounded.push message_queue (queue_item ~u request))
            (fun () -> t)
            (function
              | Lwt_pipe.Closed ->
                  let name = Format.asprintf "%a" Name.pp w.name in
                  fail (Closed {base = base_name; name})
              | exn -> raise exn)

    let pending_requests (type a) (w : a queue t) =
      let peeked =
        try
          match w.buffer with
          | Queue_buffer message_queue ->
              Lwt_pipe.Unbounded.peek_all_now message_queue
          | Bounded_buffer message_queue ->
              Lwt_pipe.Bounded.peek_all_now message_queue
        with Lwt_pipe.Closed -> []
      in
      List.map
        (function (t, Message (req, _)) -> (t, Request.view req))
        peeked

    let pending_requests_length (type a) (w : a queue t) =
      let pipe_length (type a) (q : a buffer) =
        match q with
        | Queue_buffer queue -> Lwt_pipe.Unbounded.length queue
        | Bounded_buffer queue -> Lwt_pipe.Bounded.length queue
        | Dropbox_buffer _ -> 1
      in
      pipe_length w.buffer
  end

  let close (type a) (w : a t) =
    let wakeup = function
      | (_, Message (_, Some u)) ->
          let name = Format.asprintf "%a" Name.pp w.name in
          Lwt.wakeup_later u (error (Closed {base = base_name; name}))
      | (_, Message (_, None)) -> ()
    in
    let close_queue message_queue =
      let messages = Lwt_pipe.Bounded.pop_all_now message_queue in
      List.iter wakeup messages ;
      Lwt_pipe.Bounded.close message_queue
    in
    let close_unbounded_queue message_queue =
      let messages = Lwt_pipe.Unbounded.pop_all_now message_queue in
      List.iter wakeup messages ;
      Lwt_pipe.Unbounded.close message_queue
    in
    match w.buffer with
    | Queue_buffer message_queue -> close_unbounded_queue message_queue
    | Bounded_buffer message_queue -> close_queue message_queue
    | Dropbox_buffer message_box ->
        (try Option.iter wakeup (Lwt_dropbox.peek message_box)
         with Lwt_dropbox.Closed -> ()) ;
        Lwt_dropbox.close message_box

  let pop (type a) (w : a t) =
    let open Lwt_syntax in
    let pop_queue message_queue =
      match w.timeout with
      | None ->
          let* m = Lwt_pipe.Bounded.pop message_queue in
          return_some m
      | Some timeout ->
          Lwt_pipe.Bounded.pop_with_timeout
            (Systime_os.sleep timeout)
            message_queue
    in
    let pop_unbounded_queue message_queue =
      match w.timeout with
      | None ->
          let* m = Lwt_pipe.Unbounded.pop message_queue in
          return_some m
      | Some timeout ->
          Lwt_pipe.Unbounded.pop_with_timeout
            (Systime_os.sleep timeout)
            message_queue
    in
    match w.buffer with
    | Queue_buffer message_queue -> pop_unbounded_queue message_queue
    | Bounded_buffer message_queue -> pop_queue message_queue
    | Dropbox_buffer message_box -> (
        match w.timeout with
        | None ->
            let* m = Lwt_dropbox.take message_box in
            return_some m
        | Some timeout ->
            Lwt_dropbox.take_with_timeout (Systime_os.sleep timeout) message_box
        )

  let trigger_shutdown w = Lwt.ignore_result (Lwt_canceler.cancel w.canceler)

  let canceler {canceler; _} = canceler

  let lwt_emit w (status : Logger.status) =
    let (module LogEvent) = w.logEvent in
    let time = Systime_os.now () in
    Lwt.bind
      (LogEvent.emit
         ~section:(Internal_event.Section.make_sanitized Name.base)
         (fun () -> Time.System.stamp ~time status))
      (function
        | Ok () -> Lwt.return_unit
        | Error el ->
            Format.kasprintf
              Lwt.fail_with
              "Worker_event.emit: %a"
              pp_print_trace
              el)

  let log_event w evt = lwt_emit w (Logger.WorkerEvent (evt, Event.level evt))

  let record_event w evt = Lwt.ignore_result (log_event w evt)

  module type HANDLERS = sig
    type self

    val on_launch :
      self -> Name.t -> Types.parameters -> Types.state tzresult Lwt.t

    val on_request : self -> 'a Request.t -> 'a tzresult Lwt.t

    val on_no_request : self -> unit tzresult Lwt.t

    val on_close : self -> unit Lwt.t

    val on_error :
      self ->
      Request.view ->
      Worker_types.request_status ->
      error list ->
      unit tzresult Lwt.t

    val on_completion :
      self -> 'a Request.t -> 'a -> Worker_types.request_status -> unit Lwt.t
  end

  let create_table buffer_kind =
    {buffer_kind; last_id = 0; instances = Nametbl.create ~random:true 10}

  let worker_loop (type kind) handlers (w : kind t) =
    let (module Handlers : HANDLERS with type self = kind t) = handlers in
    let do_close errs =
      let open Lwt_syntax in
      let t0 =
        match w.status with
        | Running t0 -> t0
        | Launching _ | Closing _ | Closed _ -> assert false
      in
      w.status <- Closing (t0, Systime_os.now ()) ;
      close w ;
      let* () = Error_monad.cancel_with_exceptions w.canceler in
      w.status <- Closed (t0, Systime_os.now (), errs) ;
      let* () = Handlers.on_close w in
      Nametbl.remove w.table.instances w.name ;
      w.state <- None ;
      return_unit
    in
    let rec loop () =
      (* The call to [protect] here allows the call to [pop] (responsible
         for fetching the next request) to be canceled by the use of the
         [canceler].

         These cancellations cannot affect the processing of ongoing requests.
         This is due to the limited scope of the argument of [protect]. As a
         result, ongoing requests are never canceled by this mechanism.

         In the case when the [canceler] is canceled whilst a request is being
         processed, the processing eventually resolves, at which point a
         recursive call to this [loop] at which point this call to [protect]
         fails immediately with [Canceled]. *)
      Lwt.bind
        Lwt_tzresult_syntax.(
          let* popped =
            protect ~canceler:w.canceler (fun () -> lwt_ok @@ pop w)
          in
          match popped with
          | None -> Handlers.on_no_request w
          | Some (pushed, Message (request, u)) -> (
              let current_request = Request.view request in
              let treated_time = Systime_os.now () in
              w.current_request <- Some (pushed, treated_time, current_request) ;
              match u with
              | None ->
                  let* res = Handlers.on_request w request in
                  let completed_time = Systime_os.now () in
                  let treated = Ptime.diff treated_time pushed in
                  let completed = Ptime.diff completed_time treated_time in
                  w.current_request <- None ;
                  let status = Worker_types.{pushed; treated; completed} in
                  let* () =
                    lwt_ok @@ Handlers.on_completion w request res status
                  in
                  let* () =
                    lwt_ok
                    @@ lwt_emit w (Request (current_request, status, None))
                  in
                  return_unit
              | Some u ->
                  let* res =
                    (* [res] is a promise of a result (i.e., it is within the
                       LwtResult combined monad. But the side effect [wakeup]
                       needs to happen regardless of success (Ok) or failure
                       (Error). To that end, we treat it locally like a regular
                       promise (which happens to carry a [result]) within the Lwt
                       monad. *)
                    let open Lwt_syntax in
                    let* res = Handlers.on_request w request in
                    Lwt.wakeup_later u res ;
                    return res
                  in
                  let completed_time = Systime_os.now () in
                  let treated = Ptime.diff treated_time pushed in
                  let completed = Ptime.diff completed_time treated_time in
                  let status = Worker_types.{pushed; treated; completed} in
                  w.current_request <- None ;
                  let* () =
                    lwt_ok @@ Handlers.on_completion w request res status
                  in
                  let* () =
                    lwt_ok
                    @@ lwt_emit w (Request (current_request, status, None))
                  in
                  return_unit))
        Lwt_syntax.(
          function
          | Ok () -> loop ()
          | Error (Canceled :: _)
          | Error (Exn Lwt.Canceled :: _)
          | Error (Exn Lwt_pipe.Closed :: _)
          | Error (Exn Lwt_dropbox.Closed :: _) ->
              let* () = lwt_emit w Terminated in
              do_close None
          | Error errs -> (
              let* r =
                match w.current_request with
                | Some (pushed, treated_time, request) ->
                    let completed_time = Systime_os.now () in
                    let treated = Ptime.diff treated_time pushed in
                    let completed = Ptime.diff completed_time treated_time in
                    w.current_request <- None ;
                    Handlers.on_error
                      w
                      request
                      Worker_types.{pushed; treated; completed}
                      errs
                | None -> assert false
              in
              match r with
              | Ok () -> loop ()
              | Error (Timeout :: _ as errs) ->
                  let* () = lwt_emit w Terminated in
                  do_close (Some errs)
              | Error errs ->
                  let* () = lwt_emit w (Crashed errs) in
                  do_close (Some errs)))
    in
    loop ()

  let launch :
      type kind.
      kind table ->
      ?timeout:Time.System.Span.t ->
      Name.t ->
      Types.parameters ->
      (module HANDLERS with type self = kind t) ->
      kind t tzresult Lwt.t =
   fun table ?timeout name parameters (module Handlers) ->
    let name_s = Format.asprintf "%a" Name.pp name in
    let full_name =
      if name_s = "" then base_name
      else Format.asprintf "%s_%s" base_name name_s
    in
    if Nametbl.mem table.instances name then
      invalid_arg
        (Format.asprintf "Worker.launch: duplicate worker %s" full_name)
    else
      let id =
        table.last_id <- table.last_id + 1 ;
        table.last_id
      in
      let id_name =
        if name_s = "" then base_name else Format.asprintf "%s_%d" base_name id
      in
      let canceler = Lwt_canceler.create () in
      let buffer : kind buffer =
        match table.buffer_kind with
        | Queue -> Queue_buffer (Lwt_pipe.Unbounded.create ())
        | Bounded {size} ->
            Bounded_buffer
              (Lwt_pipe.Bounded.create
                 ~max_size:size
                 ~compute_size:(fun _ -> 1)
                 ())
        | Dropbox _ -> Dropbox_buffer (Lwt_dropbox.create ())
      in
      let w =
        {
          parameters;
          name;
          canceler;
          table;
          buffer;
          state = None;
          id;
          worker = Lwt.return_unit;
          timeout;
          current_request = None;
          logEvent = (module Logger.LogEvent);
          status = Launching (Systime_os.now ());
        }
      in
      Nametbl.add table.instances name w ;
      let open Lwt_tzresult_syntax in
      let started = if id_name = base_name then None else Some name_s in
      let* () = lwt_ok @@ lwt_emit w (Started started) in
      let* state = Handlers.on_launch w name parameters in
      w.status <- Running (Systime_os.now ()) ;
      w.state <- Some state ;
      w.worker <-
        Lwt_utils.worker
          full_name
          ~on_event:Internal_event.Lwt_worker_event.on_event
          ~run:(fun () -> worker_loop (module Handlers) w)
          ~cancel:(fun () -> Error_monad.cancel_with_exceptions w.canceler) ;
      return w

  let shutdown w =
    (* The actual cancellation ([Lwt_canceler.cancel w.canceler]) resolves
       immediately because no hooks are registered on the canceler. However, the
       worker ([w.worker]) resolves only once the ongoing request has resolved
       (if any) and some clean-up operations have completed. *)
    let open Lwt_syntax in
    let* () = lwt_emit w Triggering_shutdown in
    let* () = Error_monad.cancel_with_exceptions w.canceler in
    w.worker

  let state w =
    match (w.state, w.status) with
    | (None, Launching _) ->
        invalid_arg
          (Format.asprintf
             "Worker.state (%s[%a]): state called before worker was initialized"
             base_name
             Name.pp
             w.name)
    | (None, (Closing _ | Closed _)) ->
        invalid_arg
          (Format.asprintf
             "Worker.state (%s[%a]): state called after worker was terminated"
             base_name
             Name.pp
             w.name)
    | (None, _) -> assert false
    | (Some state, _) -> state

  let pending_requests q = Queue.pending_requests q

  let status {status; _} = status

  let current_request {current_request; _} = current_request

  let information (type a) (w : a t) =
    {
      Worker_types.instances_number = Nametbl.length w.table.instances;
      wstatus = w.status;
      queue_length =
        (match w.buffer with
        | Queue_buffer pipe -> Lwt_pipe.Unbounded.length pipe
        | Bounded_buffer pipe -> Lwt_pipe.Bounded.length pipe
        | Dropbox_buffer _ -> 1);
    }

  let list {instances; _} =
    Nametbl.fold (fun n w acc -> (n, w) :: acc) instances []

  let find_opt {instances; _} = Nametbl.find instances

  (* TODO? add a list of cancelers for nested protection ? *)
  let protect {canceler; _} ?on_error f = protect ?on_error ~canceler f

  let () =
    Internal_event.register_section
      (Internal_event.Section.make_sanitized Name.base)
end
back to top