Revision a1e0fc4e5c86c4a179c06151a8b5ae66c3d08c28 authored by Roberto Di Cosmo on 18 July 2011, 14:53:54 UTC, committed by Roberto Di Cosmo on 18 July 2011, 14:53:54 UTC
0 parent
parmap.ml
(**************************************************************************)
(* ParMap: a simple library to perform Map computations on a multi-core *)
(* *)
(* Main author(s): Marco Danelutto, Roberto Di Cosmo *)
(* *)
(* This library is free software: you can redistribute it and/or modify *)
(* it under the terms of the GNU Lesser General Public License as *)
(* published by the Free Software Foundation, either version 3 of the *)
(* License, or (at your option) any later version. A special linking *)
(* exception to the GNU Lesser General Public License applies to this *)
(* library, see the COPYING file for more information. *)
(**************************************************************************)
let mkpipe () =
let (ofd,ifd) = Unix.pipe () in
((fun x -> Marshal.to_channel (Unix.out_channel_of_descr ofd) x [Marshal.Closures]),
(fun () -> Marshal.from_channel (Unix.in_channel_of_descr ifd)),
(fun () -> Printf.printf "Closing out (I am %d)\n" (Unix.getpid()); Unix.close ofd),
(fun () -> Printf.printf "Closing in (I am %d)\n" (Unix.getpid()); Unix.close ifd)
)
;;
let parmap f l ?(ncores=1) =
(* init task parameters *)
let ln = List.length l in
let chunksize = ln/ncores in
let pidl = ref [] in
for i = 0 to ncores-1 do
Printf.printf "Iteration %d\n" i; flush stdout; flush stderr;
(* create a pipe for writing back the results *)
let (writep,readp,closew,closer) = mkpipe() in
match Unix.fork() with
0 ->
begin
closer();
let reschunk=ref [] in
for j=i*chunksize to (i+1)*chunksize-1 do
try
reschunk := (f (List.nth l j))::!reschunk
with _ -> (Printf.printf "Error: j=%d\n" j)
done;
Printf.eprintf "Child: %d finished computing \n" i;
Printf.printf "Size: %d\n" (List.length !reschunk);
List.map (fun n -> Printf.printf "%d\n" n) !reschunk; flush stdout;
writep (List.rev !reschunk); closew();
Printf.eprintf "Child: %d exiting in 20 seconds.\n" i;
exit 0
end
| -1 -> Printf.eprintf "Fork error: pid %d; i=%d.\n" (Unix.getpid()) i;
| pid -> (closew(); Printf.printf "Closed for pid %d\n" pid;pidl:= (pid,readp)::!pidl)
done;
let out_of_order_l =
List.map
(fun _ ->
let (pid,s) = Unix.wait()
in
try
(pid,(List.assoc pid !pidl)())
with f -> ((Printf.printf "Read error on pid %d\n" pid; (pid,[])); raise f)
) !pidl
in List.flatten (List.map (fun pid -> List.assoc pid out_of_order_l) (List.rev (List.map fst !pidl)))
;;
List.map (fun n -> Printf.printf "%d\n" n) (parmap (fun x -> x+1) [1;2;3;4;5;6;7;8;9;10;11;12] ~ncores:1);;
Computing file changes ...