(**************************************************************************) (* ParMap: a simple library to perform Map computations on a multi-core *) (* *) (* Author(s): Marco Danelutto, Roberto Di Cosmo *) (* *) (* This library is free software: you can redistribute it and/or modify *) (* it under the terms of the GNU Lesser General Public License as *) (* published by the Free Software Foundation, either version 3 of the *) (* License, or (at your option) any later version. A special linking *) (* exception to the GNU Lesser General Public License applies to this *) (* library, see the LICENSE file for more information. *) (**************************************************************************) open ExtLib (* unmarshal from a mmap seen as a bigarray *) let unmarshal fd = let a=Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true (-1) in let read_mmap ofs len = let s = String.make len ' ' in for k = 0 to len-1 do s.[k]<-a.{ofs+k} done; s in (* read the header *) let s = read_mmap 0 Marshal.header_size in let size=Marshal.total_size s 0 in let s' = read_mmap 0 size in Unix.close fd; Marshal.from_string s' 0 (* marshal to a mmap seen as a bigarray *) let marshal pid fd v = let s = Marshal.to_string v [Marshal.Closures] in let sl = (String.length s) in let ba = Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true sl in for k = 0 to sl-1 do ba.{k} <-s.[k] done; Unix.close fd (* create a shadow file descriptor *) let tempfd () = let name = Filename.temp_file "mmap" "TMP" in try let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in Unix.unlink name; fd with e -> Unix.unlink name; raise e (* the core parallel mapfold function *) let parmapfold (f:'a -> 'b) (l:'a list) (op:'b->'c->'c) (opid:'c) ?(ncores=1) : 'c= (* flush everything *) flush stdout; flush stderr; (* init task parameters *) let ln = List.length l in let chunksize = ln/ncores in let fdarr=Array.init ncores (fun _ -> tempfd()) in for i = 0 to ncores-1 do match Unix.fork() with 0 -> begin let pid = Unix.getpid() in let reschunk=ref opid in let lo=i*chunksize in let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in (* iterate in reverse order, to accumulate in the right order *) for j=0 to (hi-lo) do try reschunk := op (f (List.nth l (hi-j))) !reschunk with _ -> (Printf.printf "Error: j=%d\n" j) done; marshal pid fdarr.(i) !reschunk; exit 0 end | -1 -> Printf.eprintf "Fork error: pid %d; i=%d.\n" (Unix.getpid()) i; | pid -> () done; (* wait for all childrens *) for i = 0 to ncores-1 do try ignore(Unix.wait()) with Unix.Unix_error (Unix.ECHILD, _, _) -> () done; (* read in all data *) let res = ref [] in (* iterate in reverse order, to accumulate in the right order *) for i = 0 to ncores-1 do res:= (unmarshal fdarr.((ncores-1)-i))::!res; done; (* use extLib's tail recursive one *) List.fold_right op (List.flatten !res) opid ;; (* the parallel map function *) let parmap (f:'a -> 'b) (l:'a list) ?(ncores=1) : 'b list= parmapfold f l (fun v acc -> v::acc) [] ~ncores ;; (* the parallel fold function *) let parfold (l:'a list) (op:'a -> 'b -> 'b) (opid:'b) ?(ncores=1) : 'b= parmapfold (fun x -> x) l op opid ~ncores ;;