Revision 6ee5d28a8988618b187e10ebbe680be736c694c4 authored by fclement on 21 September 2006, 15:57:14 UTC, committed by fclement on 21 September 2006, 15:57:14 UTC
1 parent 4442830
streams.ml
(***********************************************************************)
(* *)
(* OcamlP3l *)
(* *)
(* Zheng Li, PPS Lab, Universite Paris 7 *)
(* *)
(* This program is free software; you can redistribute it and/or *)
(* modify it under the terms of the GNU Library General Public *)
(* License as published by the Free Software Foundation; either *)
(* version 2 of the License, or (at your option) any later version. *)
(* *)
(* This program is distributed in the hope that it will be useful, *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *)
(* GNU Library General Public License for more details. *)
(* *)
(***********************************************************************)
(* $Id: streams.ml,v 1.6 2006-02-27 15:59:24 fclement Exp $ *)
open P3lstream;;
(*********************************)
(* STREAM LANGUAGE BEGINS HERE *)
(*********************************)
(* Here goes a sequential semantics interpreter of the stream language
which can be used to program skeletons. The interpreter is based on
the stream's mathematical model above. *)
let streamer: (('a pkg -> 'b pkg) * color) -> ('a s -> 'b s) =
fun (f, _) s ->
from_fun (fun () -> f (pkg_next s));;
(* Internal producer take a function f (which can generate a internal
'b stream from each 'a package) and a 'a stream s, then produce a 'b
stream s' which flattens all internal 'b streams produced by applying
f to each item of s *)
let dive: (('a pkg -> 'b pkg array) * color) -> 'a s -> 'b s =
fun (f,_) s ->
from_fun
( let i_array = ref [| |] in
let i_len = ref (Array.length !i_array) in
let i_ind = ref (!i_len) in
let rec genf =
fun () ->
if (!i_ind) >= (!i_len) then
let p = pkg_next s in
i_array := f p;
i_len := Array.length (!i_array);
i_ind := 0;
(* do not produce [||] from Eos, semantically, it
will produce infinite loop *)
genf ()
else
let tmp = (!i_array).(!i_ind) in
i_ind := (!i_ind) + 1;
tmp
in genf
) ;;
(* Internal consumer take a function f (which can consume a 'a "sub"
stream to single 'b package) a 'a stream then produce 'b output
stream. It should be the counterpart of producer function. However,
in most cases, a package to be generated depends on a set of
associated pkgs from the input stream which are yet unconsecutive.
the consumer actually provide a internal buffering and ordering
mechanics for that: it will produce a valid substream to feed the
function f according to the following convention: (for package
Pac((n,l)::t, v), we call t as tag base, (n,l) as current tag, n as
sub-pac amount, l as sub-pac number)
(1) A consumer always consider the packages with the same tag base as
a group. A group is considered valid until all its packages arrives,
i.e. for any i \in (0,l) Pac((i,l)::t,v) has arrived, then this group
(as a ordered substream) is feed to function f.
(2) For those group with infinite(or sub-pac amount unknow) packages
which always has (l=-1), they will be considered valid when certain
Sign arrives (to be developed)
*)
let unif: (('a pkg list -> ('b pkg * 'a pkg list) option) * color) -> 'a s -> 'b s =
fun (f,_) ->
let ihash = Hashtbl.create 16 in
fun s -> from_fun (
let rec ask () =
match pkg_next s with
| Sign _ as p -> p
| Pac(x::t, v) as p ->
let () = Hashtbl.add ihash t p in
let pl = Hashtbl.find_all ihash t in
(match (f pl) with
| None ->
ask ()
| Some (op, rl) ->
List.iter (fun _ -> Hashtbl.remove ihash t) pl;
List.iter (fun x -> Hashtbl.add ihash t x) rl;
op
)
| _ -> assert false in
ask );;
(*
let chk_avail_ref = ref (fun () -> [-1]);;
let chk_avail () = !chk_avail_ref ();;
let chk_demand_ref = ref (fun () -> [-1]);;
let chk_demand () = !chk_demand_ref () ;;
*)
let split: (int * ('a pkg -> int) * color) -> 'a s -> 'a s array =
fun (n,cf,_) s ->
let sa = Array.make n (singleton (Sign Eos)) in
let lock = ref true in
let pfun i = from_fun(
let genf =
fun () ->
if not (!lock) then Sign Eos else
let () = lock := false in
let e = pkg_next s in
let () = lock := true in
(match e with
| Sign _ -> e
| _ ->
let j = cf e in
(* the definition here is tricky, however it must
be like this considering recursive skeleton *)
buff_pkg e sa.(j); pkg_next sa.(i))
in genf) in
Array.iteri (fun i _ -> sa.(i) <- pfun i) sa;
sa;;
let combine: ((unit -> int) * color) -> 'a s array -> 'a s =
fun (cf,_) sa ->
let l = Array.length sa in
from_fun(
let genf () =
(* flag array has to be flashed for each request, considering
recursive stream together with network buffering*)
let fa = Array.make l true in
let rec aux_genf () =
if not (Array.fold_left (||) false fa) then Sign Eos
else
let b = cf () in
if not fa.(b) then aux_genf()
else
(match pkg_next (sa.(b)) with
| Sign Eos -> fa.(b) <- false; aux_genf ()
| Pac _ as x -> x ) in
aux_genf () in
genf);;
let switch: (('a pkg -> bool) * color) -> 'a s -> ('a s * 'a s) =
fun (cf,col) ->
let mcf = fun e -> if cf e then 0 else 1 in
fun s ->
let sa = split (2, mcf, col) s in
sa.(0), sa.(1);;
let recur:
(int * int) -> ('a s array -> 'a s array) -> ('a s array -> 'a s array) =
fun (m,n) spnx ->
let m_pipe = singleton (Sign Eos) in
fun sa ->
let sai =
Array.init (Array.length sa + 1)
(fun x ->
if x = m then m_pipe
else if x > m then sa.(x-1) else sa.(x)) in
let sao = spnx sai in
(* the following trick is ugly, we should find a better way to
define "recursive with undefined initial input" *)
m_pipe.gen <- (fun () -> pkg_next sao.(n));
Array.init (Array.length sa)
(fun x -> if x<n then sao.(x) else sao.(x+1));;
Computing file changes ...