Raw File
lwt_pipe.mli
(*****************************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(* Copyright (c) 2021 Nomadic Labs <contact@nomadic-labs.com>                *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

exception Closed

module Bounded : sig
  (** Data queues similar to the [Pipe] module in Jane Street's [Async]
    library. They are implemented with [Queue]s, limited in size, and
    use Lwt primitives for concurrent access. *)

  (** Type of queues holding values of type ['a]. *)
  type 'a t

  (** [create ~max_size ~compute_size ()] is an empty queue that can hold at
      most [max_size] "bytes" of data, using [compute_size] to compute the
      number of "bytes" in a datum.

      Note that you can use [max_size]/[compute_size] to actually limit the size
      in byte (i.e., the memory footprint of the structure (in this case,
      consider using {!push_overhead} to account for the boilerplate memory),
      but you can also use [max_size]/[compute_size] to limit the footprint of
      the structure for some other resource. E.g., you can spin up tasks in
      separate processes and limit the number of concurrently running processes.

      Also note that the size bound is not inclusive. So with [max_size] set to
      [2] and [compute_size] to [fun _ -> 1] you can add one (1) element and
      then the pipe is full. (It is full because adding any other element would
      take the total size to [2] which is not strictly smaller than the
      [max_size] bound.)

      Finally, note that when the pipe is empty, inserting an element will
      always succeed immediately, even if its size exceed the size bound. For
      the same reason, inserting an element bigger than the size bound will
      eventually succeeds, but only once the pipe has been emptied. Once such an
      element has been inserted, no other element can be pushed until the
      bigger-than-bound element is popped. This behaviour breaks the invariant
      guaranteed by the module, but is kept for backwards compatibility. It may
      be changed in the future, but only after a careful review of the potential
      impact. *)
  val create : max_size:int -> compute_size:('a -> int) -> unit -> 'a t

  (** [push q v] is a promise that is pending until there is enough space in [q]
    to accommodate [v]. When this happens [v] is added to the end of [q] and the
    promise resolves.

    If there is enough space in [q] to accommodate [v] when the call is made,
    then the [v] is added immediately and an already resolved promise is
    returned.

    Note that if several writes are stuck because the pipe is full. These
    writes will succeed in an order that might be different from the order the
    write attempts were made. Specifically, when pushing elements of different
    computed sizes, smaller pushes may be resolved earlier if enough space is
    freed.

    @raise {!Closed} if [q] is closed. More specifically, the promise is
    rejected with {!Closed} if [q] is closed. *)
  val push : 'a t -> 'a -> unit Lwt.t

  (** [pop q] is a promise that is pending until there is an element in [q]. When
    this happens an element is removed and the promise is fulfilled with it.

    If there is already an element in [q] when the call is made, the element is
    removed immediately and an already resolved promise is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop : 'a t -> 'a Lwt.t

  (** [pop_with_timeout t q] is a promise that behaves similarly to [pop q]
    except that it resolves with [None] if [t] resolves before there is an
    element in [q] to pop.

    Note that there can be multiple promises that are awaiting for an element to
    pop from the queue. As a result, it is possible that [pop_with_timeout] is
    fulfilled with [None] even though values have been pushed to the [q].

    [t] is canceled (i.e., it fails with [Canceled]) if an element is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop_with_timeout : unit Lwt.t -> 'a t -> 'a option Lwt.t

  (** [pop_all q] is a promise that is pending until there is at least one element
    in [q]. When this happens, all the elements of [q] are removed and the
    promise is fulfilled with the list of elements (in the order in which they
    were inserted).

    If there is already one or more elements in [q] when the call is made, the
    elements are removed immediately and an already resolved promise is
    returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed.

    In practice, this function returns a promise that either:
    - is pending and will resolve with a single-element list,
    - is already resolved with a list of at least one element,
    - or will be rejected with {!Closed}. *)
  val pop_all : 'a t -> 'a list Lwt.t

  (** [pop_all_now q] removes and returns all the elements in [q] (in the order in
    which they were inserted). If [q] is empty, [[]] is returned.

    @raise {!Closed} if [q] is empty and closed. *)
  val pop_all_now : 'a t -> 'a list

  (** [peek q] returns the same value as [pop q] but does not remove the returned
    element.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val peek : 'a t -> 'a Lwt.t

  (** [peek_all_now q] returns the elements in the [q] (oldest first), or [[]]
      if empty. It does not remove elements from [q].

      @raise {!Closed} if [q] is empty and closed. *)
  val peek_all_now : 'a t -> 'a list

  (** [push_now q v] either
      - adds [v] at the ends of [q] immediately and returns [true], or
      - if [q] is full, returns [false].

      @raise Closed if [q] is closed. *)
  val push_now : 'a t -> 'a -> bool

  (** [pop_now q] may remove and return the first element in [q] if
    [q] contains at least one element.

    @raise Closed if [q] is closed. *)
  val pop_now : 'a t -> 'a option

  (** [length q] is the number of elements in [q]. *)
  val length : 'a t -> int

  (** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
  val is_empty : 'a t -> bool

  (** [close q] the write-end of [q]:

    - Pending and future write attempts will fail with {!Closed}.
    - If there is data left in the pipe, then future read attempts will be
      resolved until the remaining data is drained, after which further reads
      will fail with {!Closed}.
    - If there is no data left in the pipe, then pending and future reads will
      fail with {!Closed}.

    The [close] function is idempotent. *)
  val close : 'a t -> unit

  (** [is_closed q] is [true] if [close q] has been called.
    It is [false] otherwise. *)
  val is_closed : 'a t -> bool

  (** The number of bytes used in the internal representation to hold an element
    in the queue. *)
  val push_overhead : int
end

module Unbounded : sig
  (** [Unbounded] is a variant of {!Bounded} where there is no size
    limit. It is equivalent to using {!Bounded} with
    [compute_size = Fun.const 0] but some functions are specialised to this
    particular setup. *)

  (** Type of queues holding values of type ['a]. *)
  type 'a t

  (** [create ()] is an empty unbounded queue. *)
  val create : unit -> 'a t

  (** [push q v]: [v] is added immediately to [q].

      Note that pushing never needs to wait for room to be available inside the
      pipe. As a result, [push] returns [unit]. This is unlike {!Bounded.push},
      where {!Bounded.push} may need to wait for some space to be freed inside
      the pipe and thus returns [unit Lwt.t].

      @raise {!Closed} if [q] is closed. *)
  val push : 'a t -> 'a -> unit

  (** [pop q] is a promise that is pending until there is at least one element in
    [q]. When this happens an element is removed and the promise is fulfilled
    with it.

    If there is already an element in [q] when the call is made, the element is
    removed immediately and an already resolved promise is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop : 'a t -> 'a Lwt.t

  (** [pop_with_timeout t q] is a promise that behaves similarly to [pop q]
    except that it resolves with [None] if [t] resolves before there is an
    element in [q] to pop.

    Note that there can be multiple promises that are awaiting for an element to
    pop from the queue. As a result, it is possible that [pop_with_timeout] is
    fulfilled with [None] even though values have been pushed to the [q].

    [t] is canceled (i.e., it fails with [Canceled]) if an element is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop_with_timeout : unit Lwt.t -> 'a t -> 'a option Lwt.t

  (** [pop_all q] is a promise that is pending until there is at least one element
    in [q]. When this happens, all the elements of [q] are removed and the
    promise is fulfilled with the list of elements (in the order in which they
    were inserted).

    If there is already one or more elements in [q] when the call is made, the
    elements are removed immediately and an already resolved promise is
    returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed.

    In practice, this function returns a promise that either:
    - is pending and will resolve with a single-element list,
    - is already resolved with a list of at least one element,
    - or will be rejected with {!Closed}. *)
  val pop_all : 'a t -> 'a list Lwt.t

  (** [pop_all_now q] removes and returns all the elements in [q] (in the order in
    which they were inserted). If [q] is empty, [[]] is returned.

    @raise {!Closed} if [q] is empty and closed. *)
  val pop_all_now : 'a t -> 'a list

  (** [peek q] returns the same value as [pop q] but does not remove the returned
    element.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val peek : 'a t -> 'a Lwt.t

  (** [peek_all_now q] returns the elements in the [q] (oldest first), or [[]]
      if empty. It does not remove elements from [q].

      @raise {!Closed} if [q] is empty and closed. *)
  val peek_all_now : 'a t -> 'a list

  (** [pop_now q] may remove and return the first element in [q] if
    [q] contains at least one element.

    @raise Closed if [q] is closed. *)
  val pop_now : 'a t -> 'a option

  (** [length q] is the number of elements in [q]. *)
  val length : 'a t -> int

  (** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
  val is_empty : 'a t -> bool

  (** [close q] the write-end of [q]:

    - Future write attempts will fail with {!Closed}.
    - If there is data left in the pipe, then future read attempts will be
      resolved until the remaining data is drained, after which further reads
      will fail with {!Closed}.
    - If there is no data left in the pipe, then pending and future reads will
      fail with {!Closed}.

    The [close] function is idempotent. *)
  val close : 'a t -> unit

  (** [is_closed q] is [true] if [close q] has been called.
    It is [false] otherwise. *)
  val is_closed : 'a t -> bool
end

module Maybe_bounded : sig
  (** [Maybe_bounded] are pipes that may or may not be bounded. This is decided
       when [create]ing the pipe and can be queried with the [bounded] function.
    *)

  (** Type of queues holding values of type ['a]. *)
  type 'a t

  (** [create ~bound:(max_size, compute_size) ()] is an empty queue that can
      hold max [bound] "bytes" of data, using [compute_size] to compute the
      number of "bytes" in a datum. I.e., it is equivalent to
      [Bounded.create ~max_size ~compute_size] but the functions below make no
      assumptions about the bound leading to a slightly different interface and
      potentially worse performances.

      [create ()], with the [bound] argument no set, is an empty queue that is
      unbounded. I.e., it is equivalent to [Unbounded.create ()] but the
      functions below make no assumptions about the bound leading to a slightly
      different interface and potentially worse performances. *)
  val create : ?bound:int * ('a -> int) -> unit -> 'a t

  (** [bounded t] is [true] iff [t] was [create]d with a set [bound]. *)
  val bounded : 'a t -> bool

  (** [push q v] is a promise that is pending until there is enough space in [q]
    to accommodate [v]. When this happens [v] is added to the end of [q] and the
    promise resolves.

    If there is enough space in [q] to accommodate [v] when the call is made,
    then the [v] is added immediately and an already resolved promise is
    returned.

    Note that if several writes are stuck because the pipe is full. These
    writes will succeed in an order that might be different from the order the
    write attempts were made. Specifically, when pushing elements of different
    computed sizes, smaller pushes may be resolved earlier if enough space is
    freed.

    @raise {!Closed} if [q] is closed. More specifically, the promise is
    rejected with {!Closed} if [q] is closed. *)
  val push : 'a t -> 'a -> unit Lwt.t

  (** [pop q] is a promise that is pending until there is an element in [q]. When
    this happens an element is removed and the promise is fulfilled with it.

    If there is already an element in [q] when the call is made, the element is
    removed immediately and an already resolved promise is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop : 'a t -> 'a Lwt.t

  (** [pop_with_timeout t q] is a promise that behaves similarly to [pop q]
    except that it resolves with [None] if [t] resolves before there is an
    element in [q] to pop.

    Note that there can be multiple promises that are awaiting for an element to
    pop from the queue. As a result, it is possible that [pop_with_timeout] is
    fulfilled with [None] even though values have been pushed to the [q].

    [t] is canceled (i.e., it fails with [Canceled]) if an element is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop_with_timeout : unit Lwt.t -> 'a t -> 'a option Lwt.t

  (** [pop_all q] is a promise that is pending until there is an element in [q].
    When this happens, all the elements of [q] are removed and the promise is
    fulfilled with the list of elements (in the order in which they were
    inserted).

    If there is already an element in [q] when the call is made, the elements
    are removed immediately and an already resolved promise is returned.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val pop_all : 'a t -> 'a list Lwt.t

  (** [pop_all_now q] removes and returns all the elements in [q] (in the order in
    which they were inserted). If [q] is empty, [[]] is returned.

    @raise {!Closed} if [q] is empty and closed. *)
  val pop_all_now : 'a t -> 'a list

  (** [peek q] returns the same value as [pop q] but does not remove the returned
    element.

    @raise {!Closed} if [q] is empty and closed. More specifically, the promise
    is rejected with {!Closed} if [q] is empty and closed. *)
  val peek : 'a t -> 'a Lwt.t

  (** [peek_all_now q] returns the elements in the [q] (oldest first), or [[]]
      if empty. It does not remove elements from [q].

      @raise {!Closed} if [q] is empty and closed. *)
  val peek_all_now : 'a t -> 'a list

  (** [push_now q v] either
      - adds [v] at the ends of [q] immediately and returns [true], or
      - if [q] is full, returns [false].

      @raise Closed if [q] is closed. *)
  val push_now : 'a t -> 'a -> bool

  (** [pop_now q] may remove and return the first element in [q] if
    [q] contains at least one element.

    @raise Closed if [q] is closed. *)
  val pop_now : 'a t -> 'a option

  (** [length q] is the number of elements in [q]. *)
  val length : 'a t -> int

  (** [is_empty q] is [true] if [q] is empty, [false] otherwise. *)
  val is_empty : 'a t -> bool

  (** [close q] the write-end of [q]:

    - Pending and future write attempts will fail with {!Closed}.
    - If there is data left in the pipe, then future read attempts will be
      resolved until the remaining data is drained, after which further reads
      will fail with {!Closed}.
    - If there is no data left in the pipe, then pending and future reads will
      fail with {!Closed}.

    The [close] function is idempotent. *)
  val close : 'a t -> unit

  val is_closed : 'a t -> bool

  (** The number of bytes used in the internal representation to hold an element
    in the queue. *)
  val push_overhead : int
end
back to top