swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9
Raw File
Tip revision: ad58ae426e7e9200530d18bf439d02657503426c authored by fclement on 23 November 2010, 11:33:06 UTC
Ignore all generated files.
Tip revision: ad58ae4
streams.ml
(***********************************************************************)
(*                                                                     *)
(*                          OCamlP3l                                   *)
(*                                                                     *)
(* (C) 2004-2007                                                       *)
(*             Roberto Di Cosmo (dicosmo@dicosmo.org)                  *)
(*             Zheng Li (zli@lip6.fr)                                  *)
(*             Pierre Weis (Pierre.Weis@inria.fr)                      *)
(*             Francois Clement (Francois.Clement@inria.fr)            *)
(*                                                                     *)
(* Based on original Ocaml P3L System                                  *)
(* (C) 1997 by                                                         *)
(*             Roberto Di Cosmo (dicosmo@ens.fr)                       *)
(*             Marco Danelutto (marcod@di.unipi.it)                    *)
(*             Xavier Leroy  (Xavier.Leroy@inria.fr)                   *)
(*             Susanna Pelagatti (susanna@di.unipi.it)                 *)
(*                                                                     *)
(* This program is free software; you can redistribute it and/or       *)
(* modify it under the terms of the GNU Library General Public License *)
(* as published by the Free Software Foundation; either version 2      *)
(* of the License, or (at your option) any later version.              *)
(*                                                                     *)
(* This program is distributed in the hope that it will be useful,     *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of      *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the       *)
(* GNU Library General Public License for more details.                *)
(*                                                                     *)
(***********************************************************************)

(* $Id: streams.ml,v 1.8 2007-01-23 15:50:04 weis Exp $ *)

open P3lstream;;

(*********************************)
(*  STREAM LANGUAGE BEGINS HERE  *)
(*********************************)

(* Here goes a sequential semantics interpreter of the stream language
   which can be used to program skeletons. The interpreter is based on
   the stream's mathematical model above. *)

let streamer: (('a pkg -> 'b pkg) * color) -> ('a s -> 'b s) = 
  fun (f, _) s -> 
    from_fun (fun () -> f (pkg_next s));;


(* Internal producer take a function f (which can generate a internal
   'b stream from each 'a package) and a 'a stream s, then produce a 'b
   stream s' which flattens all internal 'b streams produced by applying
   f to each item of s *)
let dive: (('a pkg -> 'b pkg array) * color) -> 'a s -> 'b s = 
  fun (f,_) s ->
    from_fun 
      ( let i_array = ref [| |] in
	let i_len = ref (Array.length !i_array) in
	let i_ind = ref (!i_len) in
	let rec genf =
	  fun () -> 
	    if (!i_ind) >= (!i_len) then 
	      let p = pkg_next s in
	      i_array := f p;
	      i_len := Array.length (!i_array);
	      i_ind := 0;
	      (* do not produce [||] from Eos, semantically, it
		 will produce infinite loop *)
	      genf ()
	    else 
	      let tmp = (!i_array).(!i_ind) in
	      i_ind := (!i_ind) + 1; 
	      tmp
	in genf
      ) ;;


(* Internal consumer take a function f (which can consume a 'a "sub"
   stream to single 'b package) a 'a stream then produce 'b output
   stream. It should be the counterpart of producer function. However,
   in most cases, a package to be generated depends on a set of
   associated pkgs from the input stream which are yet unconsecutive.
   the consumer actually provide a internal buffering and ordering
   mechanics for that: it will produce a valid substream to feed the
   function f according to the following convention: (for package
   Pac((n,l)::t, v), we call t as tag base, (n,l) as current tag, n as
   sub-pac amount, l as sub-pac number)

   (1) A consumer always consider the packages with the same tag base as
   a group. A group is considered valid until all its packages arrives,
   i.e. for any i \in (0,l) Pac((i,l)::t,v) has arrived, then this group
   (as a ordered substream) is feed to function f. 

   (2) For those group with infinite(or sub-pac amount unknow) packages
   which always has (l=-1), they will be considered valid when certain
   Sign arrives (to be developed)
*)

let unif: (('a pkg list -> ('b pkg * 'a pkg list) option) * color) -> 'a s -> 'b s = 
  fun (f,_) ->
    let ihash = Hashtbl.create 16 in
    fun s -> from_fun (
      let rec ask () =
	    match pkg_next s with
	      | Sign _ as p -> p
	      | Pac(x::t, v) as p -> 
	          let () = Hashtbl.add ihash t p in
	          let pl = Hashtbl.find_all ihash t in
              (match (f pl) with
                 | None -> 
                     ask ()
                 | Some (op, rl) -> 
                     List.iter (fun _ -> Hashtbl.remove ihash t) pl;
                     List.iter (fun x -> Hashtbl.add ihash t x) rl;
                     op
              )
	      | _ -> assert false in
      ask );;


(*
let chk_avail_ref = ref (fun () -> [-1]);;
let chk_avail () = !chk_avail_ref ();;

let chk_demand_ref = ref (fun () -> [-1]);;
let chk_demand () = !chk_demand_ref () ;;
*)


let split: (int * ('a pkg -> int) * color) -> 'a s -> 'a s array = 
  fun (n,cf,_) s ->
    let sa = Array.make n (singleton (Sign Eos)) in
    let lock = ref true in
    let pfun i = from_fun( 
	  let genf = 
	    fun () ->
	      if not (!lock) then Sign Eos else
	        let () = lock := false in
	        let e = pkg_next s in
	        let () = lock := true in
	        (match e with
		       | Sign _ -> e
		       | _ ->
		           let j = cf e in
		           (* the definition here is tricky, however it must
			          be like this considering recursive skeleton *)
		           buff_pkg e sa.(j); pkg_next sa.(i))
	  in genf) in
    Array.iteri (fun i _ -> sa.(i) <- pfun i) sa;
    sa;;


let combine: ((unit -> int) * color) -> 'a s array -> 'a s = 
  fun (cf,_) sa ->
    let l = Array.length sa in
    from_fun( 
      let genf () =
	    (* flag array has to be flashed for each request, considering
	       recursive stream together with network buffering*)
	    let fa = Array.make l true in
	    let rec aux_genf () =
	      if not (Array.fold_left (||) false fa) then Sign Eos
	      else
	        let b = cf () in
	        if not fa.(b) then aux_genf() 
	        else 
              (match pkg_next (sa.(b)) with
	             | Sign Eos -> fa.(b) <- false; aux_genf ()
	             | Pac _ as x  -> x ) in
        aux_genf () in
      genf);;


let switch: (('a pkg -> bool) * color) -> 'a s -> ('a s * 'a s) = 
  fun (cf,col) ->
    let mcf = fun e -> if cf e then 0 else 1 in
    fun s -> 
      let sa = split (2, mcf, col) s in
      sa.(0), sa.(1);;

let recur: 
    (int * int) -> ('a s array -> 'a s array) -> ('a s array -> 'a s array) =
  fun (m,n) spnx ->
    let m_pipe = singleton (Sign Eos) in
    fun sa -> 
      let sai = 
        Array.init (Array.length sa + 1) 
	      (fun x -> 
	         if x = m then m_pipe
	         else if x > m then sa.(x-1) else sa.(x)) in
      let sao = spnx sai in
      (* the following trick is ugly, we should find a better way to
         define "recursive with undefined initial input" *)
      m_pipe.gen <- (fun () -> pkg_next sao.(n));
      Array.init (Array.length sa)
	    (fun x -> if x<n then sao.(x) else sao.(x+1));;
back to top