Raw File
file_event_sink.ml
(******************************************************************)
(*                                                                           *)
(* Open Source License                                                       *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com>     *)
(*                                                                           *)
(* Permission is hereby granted, free of charge, to any person obtaining a   *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense,  *)
(* and/or sell copies of the Software, and to permit persons to whom the     *)
(* Software is furnished to do so, subject to the following conditions:      *)
(*                                                                           *)
(* The above copyright notice and this permission notice shall be included   *)
(* in all copies or substantial portions of the Software.                    *)
(*                                                                           *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,  *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL   *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING   *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER       *)
(* DEALINGS IN THE SOFTWARE.                                                 *)
(*                                                                           *)
(*****************************************************************************)

open Error_monad

module Micro_seconds : sig
  (** Module with time-stamps with “at least micro-seconds” precision. *)
  type t = private float

  val now : unit -> t

  val of_float : float -> t

  val encoding : t Data_encoding.t

  val date_string : t -> string * string
end = struct
  (* Time.t is in seconds, we want more precision. *)
  type t = float

  let now () = Unix.gettimeofday ()

  let of_float f = f

  let encoding =
    let open Data_encoding in
    conv (* Cf. https://github.com/OCamlPro/ocplib-json-typed/issues/25 *)
      (fun f -> f *. 1_000_000. |> Int64.of_float)
      (fun i64 -> Int64.to_float i64 /. 1_000_000.)
      int64

  let date_string time_value =
    let open Unix in
    let open Printf in
    let tm = gmtime time_value in
    ( sprintf "%04d%02d%02d" (1900 + tm.tm_year) (tm.tm_mon + 1) tm.tm_mday,
      sprintf
        "%02d%02d%02d-%06d"
        tm.tm_hour
        tm.tm_min
        tm.tm_sec
        ((time_value -. floor time_value) *. 1_000_000. |> int_of_float) )
end

module Event_filter = struct
  type t =
    | True
    | False
    | Or of t list
    | And of t list
    | Name of string
    | Name_matches of Re.re
    | Level_in of Internal_event.level list
    | Section_in of Internal_event.Section.t list

  let rec run ~section ~level ~name filter =
    let continue = run ~section ~level ~name in
    match filter with
    | True -> true
    | False -> false
    | Or l -> List.exists continue l
    | And l -> List.for_all continue l
    | Name s -> String.equal s name
    | Name_matches re -> Re.execp re name
    | Level_in l -> List.mem ~equal:Internal_event.Level.equal level l
    | Section_in l -> List.mem ~equal:Internal_event.Section.equal section l

  let rec pp fmt filter =
    let open Format in
    match filter with
    | True -> pp_print_string fmt "true"
    | False -> pp_print_string fmt "false"
    | Or l ->
        fprintf
          fmt
          "(or@ @[<2>%a@]"
          (pp_print_list ~pp_sep:(fun fmt () -> fprintf fmt "@ ") pp)
          l
    | And l ->
        fprintf
          fmt
          "(and@ @[<2>%a@]"
          (pp_print_list ~pp_sep:(fun fmt () -> fprintf fmt "@ ") pp)
          l
    | Name s -> fprintf fmt "(name-is@ %S)" s
    | Name_matches re -> fprintf fmt "(name-matches@ %a)" Re.pp_re re
    | Level_in l ->
        fprintf
          fmt
          "(level-in@ [%s])"
          (String.concat "," (List.map Internal_event.Level.to_string l))
    | Section_in l ->
        fprintf
          fmt
          "(section-in@ [%a])"
          (pp_print_list
             ~pp_sep:(fun fmt () -> fprintf fmt ",@ ")
             (fun fmt s -> fprintf fmt "(Some %a)" Internal_event.Section.pp s))
          l
    [@@warning "-32"]

  (* -> The "unused value" warning. *)

  let t = True

  let f = False [@@warning "-32"]

  (* -> The "unused value" warning. *)

  let any l = Or l

  let all l = And l [@@warning "-32"]

  (* -> The "unused value" warning. *)

  let name_is s = Name s

  let name_matches s = Name_matches s

  let name_matches_posix s = name_matches (Re.Posix.compile_pat s)

  let level_in l = Level_in l

  let section_in l = Section_in l

  let levels_in_order =
    Internal_event.[Debug; Info; Notice; Warning; Error; Fatal]

  let level_at_least lvl =
    List.fold_left
      (fun acc l ->
        match acc with
        | [] -> if l = lvl then [l] else []
        | _ :: _ as acc -> l :: acc)
      []
      levels_in_order
    |> function
    | [] -> raise (Failure "level_at_least not found")
    | _ :: _ as levels -> level_in levels
end

type t = {
  path : string;
  (* Hopefully temporary hack to handle event which are emitted with
     the non-cooperative log functions in `Legacy_logging`: *)
  lwt_bad_citizen_hack : (string * Data_encoding.json) list ref;
  event_filter : Event_filter.t;
}

type 'event wrapped = {
  time_stamp : Micro_seconds.t;
  section : Internal_event.Section.t;
  event : 'event;
}

let wrap time_stamp section event = {time_stamp; section; event}

let wrapped_encoding event_encoding =
  let open Data_encoding in
  let v0 =
    conv
      (fun {time_stamp; section; event} -> (time_stamp, section, event))
      (fun (time_stamp, section, event) -> {time_stamp; section; event})
      (obj3
         (req "time_stamp" Micro_seconds.encoding)
         (req "section" Internal_event.Section.encoding)
         (req "event" event_encoding))
  in
  With_version.(encoding ~name:"file-event-sink-item" (first_version v0))

module Section_dir = struct
  let of_section (section : Internal_event.Section.t) =
    Format.asprintf "%a" Internal_event.Section.pp section

  let section_name = function
    | "no-section" -> Ok None
    | other -> (
        match TzString.remove_prefix ~prefix:"section-" other with
        | None -> Error "wrong-dir-name"
        | Some s -> Ok (Some s))
end

module Sink_implementation : Internal_event.SINK with type t = t = struct
  type nonrec t = t

  let uri_scheme = "unix-files"

  let configure uri =
    let event_filter =
      let name_res =
        Uri.get_query_param' uri "name-matches" |> Option.value ~default:[]
      in
      let names = Uri.get_query_param' uri "name" |> Option.value ~default:[] in
      let level_o =
        let open Option_syntax in
        let* lal = Uri.get_query_param uri "level-at-least" in
        let* lal = Internal_event.Level.of_string lal in
        return (Event_filter.level_at_least lal)
      in
      let levels = Option.to_list level_o in
      let sections =
        let somes =
          Uri.get_query_param' uri "section"
          |> Option.fold
               ~none:[]
               ~some:
                 (List.map (fun s ->
                      Internal_event.Section.make_sanitized
                        (String.split_on_char '.' s)))
        in
        let none =
          match Uri.get_query_param uri "no-section" with
          | Some "true" -> [Internal_event.Section.empty]
          | _ -> []
        in
        match somes @ none with
        | [] -> []
        | more -> [Event_filter.section_in more]
      in
      Event_filter.(
        match
          levels @ sections
          @ List.map name_matches_posix name_res
          @ List.map name_is names
        with
        | [] -> t
        | more -> any more)
    in
    let t =
      {path = Uri.path uri; lwt_bad_citizen_hack = ref []; event_filter}
    in
    Lwt.return_ok t

  let output_json ~pp file_path event_json =
    let open Lwt_syntax in
    Lwt.catch
      (fun () ->
        let* () =
          Lwt_utils_unix.create_dir ~perm:0o700 (Filename.dirname file_path)
        in
        let* ru = Lwt_utils_unix.Json.write_file file_path event_json in
        match ru with
        | Ok () -> return_ok_unit
        | Error el ->
            failwith
              "ERROR while Handling %a,@ cannot write JSON to %s:@ %a\n%!"
              pp
              ()
              file_path
              Error_monad.pp_print_trace
              el)
      (function
        | e ->
            failwith
              "ERROR while Handling %a: %s\n%!"
              pp
              ()
              (Printexc.to_string e))

  let should_handle (type a) ?(section = Internal_event.Section.empty)
      {event_filter; _} m =
    let module M = (val m : Internal_event.EVENT_DEFINITION with type t = a) in
    Event_filter.run ~section ~level:M.level ~name:M.name event_filter

  let handle (type a) {path; lwt_bad_citizen_hack; _} m
      ?(section = Internal_event.Section.empty) (event : a) =
    let open Lwt_result_syntax in
    let module M = (val m : Internal_event.EVENT_DEFINITION with type t = a) in
    let now = Micro_seconds.now () in
    let date, time = Micro_seconds.date_string now in
    let event_json =
      Data_encoding.Json.construct
        (wrapped_encoding M.encoding)
        (wrap now section event)
    in
    let tag =
      let hash =
        Marshal.to_string event_json [] |> Digest.string |> Digest.to_hex
      in
      String.sub hash 0 8
    in
    let section_dir = Section_dir.of_section section in
    let dir_path =
      List.fold_left Filename.concat path [section_dir; M.name; date; time]
    in
    let file_path =
      Filename.concat dir_path (Printf.sprintf "%s_%s_%s.json" date time tag)
    in
    lwt_bad_citizen_hack := (file_path, event_json) :: !lwt_bad_citizen_hack ;
    let* () =
      output_json file_path event_json ~pp:(fun fmt () ->
          M.pp ~short:false fmt event)
    in
    lwt_bad_citizen_hack :=
      List.filter (fun (f, _) -> f <> file_path) !lwt_bad_citizen_hack ;
    return_unit

  let close {lwt_bad_citizen_hack; _} =
    List.iter_es
      (fun (f, j) ->
        output_json f j ~pp:(fun fmt () ->
            Format.fprintf fmt "Destacking: %s" f))
      !lwt_bad_citizen_hack
end

let () = Internal_event.All_sinks.register (module Sink_implementation)

open Sink_implementation

module Query = struct
  let with_file_kind dir p =
    let open Lwt_result_syntax in
    let* kind =
      protect (fun () ->
          let* {Lwt_unix.st_kind; _} =
            Lwt_result.ok @@ Lwt_unix.stat (Filename.concat dir p)
          in
          return st_kind)
    in
    match kind with
    | Unix.S_DIR -> return (`Directory p)
    | Unix.S_REG -> return (`Regular_file p)
    | (Unix.S_CHR | Unix.S_BLK | Unix.S_LNK | Unix.S_FIFO | Unix.S_SOCK) as k ->
        return (`Special (k, p))

  let fold_directory path ~init ~f =
    let open Lwt_result_syntax in
    let* dirhandle =
      protect (fun () -> Lwt_result.ok @@ Lwt_unix.opendir path)
    in
    let rec iter prev =
      let* opt =
        protect (fun () ->
            Lwt.catch
              (fun () ->
                let* d = Lwt_result.ok @@ Lwt_unix.readdir dirhandle in
                let* wk = with_file_kind path d in
                return_some wk)
              (function
                | End_of_file ->
                    let* () = Lwt_result.ok @@ Lwt_unix.closedir dirhandle in
                    return_none
                | (e : exn) ->
                    failwith
                      "ERROR while folding %s: %s"
                      path
                      (Printexc.to_string e)))
      in
      let* p = prev in
      match opt with Some more -> iter (f p more) | None -> prev
    in
    iter init

  open TzFilename.Infix

  module Time_constraint = struct
    type op = [`Lt | `Le | `Ge | `Gt]

    type t =
      [ `Date of op * float
      | `Time of op * float
      | `And of t * t
      | `Or of t * t
      | `All ]

    let rec check_logic check_terminal (t : t) string =
      let continue = check_logic check_terminal in
      match t with
      | `All -> true
      | `And (a, b) -> continue a string && continue b string
      | `Or (a, b) -> continue a string || continue b string
      | (`Date _ | `Time _) as term -> check_terminal term

    let op_with_string = function
      | `Lt -> fun a b -> String.compare a b > 0
      | `Gt -> fun a b -> String.compare a b < 0
      | `Le -> fun a b -> String.compare a b >= 0
      | `Ge -> fun a b -> String.compare a b <= 0

    let check_date (t : t) date_string =
      check_logic
        (function
          | `Date (op, f) ->
              let s = Micro_seconds.(date_string (of_float f) |> fst) in
              op_with_string op s date_string
          | `Time _ -> true)
        t
        date_string

    let check_time (t : t) string =
      check_logic
        (function
          | `Time (op, f) ->
              let s = Micro_seconds.(date_string (of_float f) |> snd) in
              op_with_string op s string
          | `Date _ -> true)
        t
        Micro_seconds.date_string
  end

  module Report = struct
    type item =
      [ `Error of
        [ `Parsing_event of
          [`Encoding of string * exn | `Json of string * error list]
        | `Cannot_recognize_section of string ]
      | `Warning of
        [ `Expecting_regular_file_at of string
        | `Expecting_directory_at of string
        | `Unknown_event_name_at of string * string ] ]

    let pp fmt (x : item) =
      let open Format in
      let error fmt = function
        | `Parsing_event e -> (
            match e with
            | `Encoding (path, exn) ->
                fprintf
                  fmt
                  "@[Parse error:@ wrong encoding for %S: %s@]"
                  path
                  (Printexc.to_string exn)
            | `Json (path, el) ->
                fprintf
                  fmt
                  "@[Parse error:@ wrong JSON for %S: %a@]"
                  path
                  pp_print_trace
                  el)
        | `Cannot_recognize_section sec ->
            fprintf
              fmt
              "@[Directory error:@ cannot recognize section directory@ %S@]"
              sec
      in
      let warning fmt = function
        | `Expecting_regular_file_at path ->
            fprintf fmt "%S@ is not a regular file" path
        | `Expecting_directory_at path ->
            fprintf fmt "%S@ is not a directory" path
        | `Unknown_event_name_at (name, path) ->
            fprintf fmt "Unknown event name@ %S@ at@ %S" name path
      in
      match x with
      | `Error e -> fprintf fmt "@[Error:@ %a@]" error e
      | `Warning e -> fprintf fmt "@[Warning:@ %a@]" warning e

    let make_return m ((prev : item list), value) warning =
      Lwt.return_ok (m warning :: prev, value)

    let return_with_warning v e = make_return (fun e -> `Warning e) v e

    let return_with_error v e = make_return (fun e -> `Error e) v e
  end

  open Report

  let fold_event_kind_directory ~time_query path ~init ~f =
    let open Lwt_result_syntax in
    fold_directory path ~init:(return init) ~f:(fun previous -> function
      | `Directory "." | `Directory ".." -> return previous
      | `Directory date when Time_constraint.check_date time_query date ->
          fold_directory
            (path // date)
            ~init:(return previous)
            ~f:(fun previous -> function
            | `Directory "." | `Directory ".." -> return previous
            | `Directory time when Time_constraint.check_time time_query time ->
                fold_directory
                  (path // date // time)
                  ~init:(return previous)
                  ~f:
                    (fun previous -> function
                      | `Directory "." | `Directory ".." -> return previous
                      | `Regular_file file ->
                          f previous (path // date // time // file)
                      | `Directory p | `Special (_, p) ->
                          return_with_warning
                            previous
                            (`Expecting_regular_file_at
                              (path // date // time // p)))
            | `Directory _ (* filtered out *) -> return previous
            | `Regular_file p | `Special (_, p) ->
                return_with_warning
                  previous
                  (`Expecting_directory_at (path // date // p)))
      | `Directory _ (* filtered out *) -> return previous
      | `Regular_file p | `Special (_, p) ->
          return_with_warning previous (`Expecting_directory_at (path // p)))

  let handle_event_kind_directory (type a) ~time_query ~section_path ~init ~f ev
      =
    let module Event = (val ev : Internal_event.EVENT_DEFINITION with type t = a)
    in
    let handle_event_file previous path =
      let open Lwt_result_syntax in
      let*! r = Lwt_utils_unix.Json.read_file path in
      match r with
      | Ok json -> (
          try
            let {time_stamp; event; _} =
              Data_encoding.Json.destruct (wrapped_encoding Event.encoding) json
            in
            let* user_return =
              f
                (snd previous)
                ~time_stamp:(time_stamp :> float)
                (Internal_event.Generic.Event (Event.name, ev, event))
            in
            return (fst previous, user_return)
          with e ->
            return_with_error previous (`Parsing_event (`Encoding (path, e))))
      | Error el ->
          return_with_error previous (`Parsing_event (`Json (path, el)))
    in
    fold_event_kind_directory
      ~time_query
      (section_path // Event.name)
      ~init
      ~f:(fun prev file -> handle_event_file prev file)

  let fold ?on_unknown ?only_sections ?only_names ?(time_query = `All) uri ~init
      ~f =
    let open Lwt_result_syntax in
    let name_matches =
      match only_names with
      | None -> fun _ -> true
      | Some l -> fun name -> List.mem ~equal:String.equal name l
    in
    let section_matches =
      match only_sections with
      | None -> fun _ -> true
      | Some l -> fun name -> List.mem ~equal:(Option.equal String.equal) name l
    in
    let* {path = sink_path; _} = configure uri in
    fold_directory
      sink_path
      ~init:(return ([], init))
      ~f:
        (fun previous -> function
          | `Directory ("." | "..") -> return previous
          | `Directory dir -> (
              match Section_dir.section_name dir with
              | Ok sec when section_matches sec ->
                  fold_directory
                    (sink_path // dir)
                    ~init:(return ([], init))
                    ~f:
                      (fun previous -> function
                        | `Directory ("." | "..") -> return previous
                        | `Directory event_name when name_matches event_name
                          -> (
                            let open Internal_event in
                            match All_definitions.find (( = ) event_name) with
                            | Some (Generic.Definition (_, _, ev)) ->
                                handle_event_kind_directory
                                  ~time_query
                                  ev
                                  ~section_path:(sink_path // dir)
                                  ~init:previous
                                  ~f
                            | None -> (
                                match on_unknown with
                                | None ->
                                    return_with_warning
                                      previous
                                      (`Unknown_event_name_at
                                        (event_name, sink_path // dir))
                                | Some f ->
                                    fold_event_kind_directory
                                      ~time_query
                                      (sink_path // dir // event_name)
                                      ~init:previous
                                      ~f:(fun prev file ->
                                        let* () = f file in
                                        return prev)))
                        | `Directory _ (* filtered out *) -> return previous
                        | `Regular_file p | `Special (_, p) ->
                            return_with_warning
                              previous
                              (`Expecting_directory_at (sink_path // p)))
              | Ok _ (* section does not match *) -> return previous
              | Error _ ->
                  return_with_error previous (`Cannot_recognize_section dir))
          | `Regular_file p | `Special (_, p) ->
              return_with_warning
                previous
                (`Expecting_directory_at (sink_path // p)))
end
back to top