We are hiring ! See our job offers.
Raw File
Tip revision: f4f47e595980e39f9780b4538dbfe82d93c77cbe authored by Roberto Di Cosmo on 26 October 2011, 14:49:33 UTC
Redirect stdout/stderr to one separate file per worker (avoids intermixed output).
Tip revision: f4f47e5
(* ParMap: a simple library to perform Map computations on a multi-core   *)
(*                                                                        *)
(*  Author(s):  Marco Danelutto, Roberto Di Cosmo                         *)
(*                                                                        *)
(*  This library is free software: you can redistribute it and/or modify  *)
(*  it under the terms of the GNU Lesser General Public License as        *)
(*  published by the Free Software Foundation, either version 3 of the    *)
(*  License, or (at your option) any later version.  A special linking    *)
(*  exception to the GNU Lesser General Public License applies to this    *)
(*  library, see the LICENSE file for more information.                   *)

open ExtLib

let debug=false;;

(* utils; would disappear if we use list comprehension from Batteries *)

let ext_intv startv endv =
  let s,e = (min startv endv),(max startv endv) in
  let rec aux acc = function n -> if n=s then n::acc else aux (n::acc) (n-1)
  in aux [] e

(* freopen emulation, from Xavier's suggestion on OCaml mailing list *)

let reopen_out outchan filename =
  flush outchan;
  let fd1 = Unix.descr_of_out_channel outchan in
  let fd2 =
    Unix.openfile filename [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] 0o666 in
  Unix.dup2 fd2 fd1;
  Unix.close fd2

(* unmarshal from a mmap seen as a bigarray *)
let unmarshal fd =
 let a=Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true (-1) in
 let read_mmap ofs len = 
   let s = String.make len ' ' in
   for k = 0 to len-1 do s.[k]<-a.{ofs+k} done;
 (* read the header *)
 let s = read_mmap 0 Marshal.header_size in
 let size=Marshal.total_size s 0 in
 let s' = read_mmap 0 size in
 Unix.close fd;
 Marshal.from_string s' 0

(* marshal to a mmap seen as a bigarray *)

let marshal pid fd v = 
  let s = Marshal.to_string v [Marshal.Closures] in
  let sl = (String.length s) in
  let ba = Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true sl in
  for k = 0 to sl-1 do ba.{k} <-s.[k] done;
  Unix.close fd

(* create a shadow file descriptor *)

let tempfd () =
  let name = Filename.temp_file "mmap" "TMP" in
    let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
    Unix.unlink name;
  with e -> Unix.unlink name; raise e

(* setup a service on a free port *)

let setup_server quesize = 
  let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  Unix.setsockopt sock Unix.SO_REUSEADDR true;
  Unix.setsockopt sock Unix.SO_KEEPALIVE true;
  (* no need yet to perform tuning at this level
    Unix.setsockopt_int sock Unix.SO_SNDBUF 1000;
    Unix.setsockopt_int sock Unix.SO_RCVBUF 1000;
  let sockaddr = Unix.ADDR_INET (Unix.inet_addr_any, 0) in (* using zero here forces the system to provide a free port *)
  Unix.bind sock sockaddr;
  Unix.listen sock quesize;
  let sockaddr = Unix.getsockname sock in (* get back the address with the chosen port *)
  let port = match sockaddr with
  | Unix.ADDR_INET(_,x) -> x | _ -> assert false
  in (sock, sockaddr, port)

type 'a sequence = L of 'a list | A of 'a array;;

(* the type of messages exchanged between master and workers *)

type msg = Ready | Finished | Task of int | Error of int * string;;

(* the core parallel mapfold function *)

let parmapfold ?(ncores=1) ?(chunksize) (f:'a -> 'b) (s:'a sequence) (op:'b->'c->'c) (opid:'c) (concat:'c->'c->'c) : 'c=
  (* flush everything *)
  flush stdout; flush stderr;
  (* enforce array to speed up access to the list elements *)
  let al = 
    match s with 
      A al -> al
    | L l  -> Array.of_list l 
  (* init task parameters *)
  let ln = Array.length al in
  let chunksize, ntasks = 
    match chunksize with
      None -> (ln/ncores, ncores)
    | Some v -> (v,ln/v)
  let fdarr=Array.init ncores (fun _ -> tempfd()) in
  (* setup communication channel with the workers *)
  let sock, sockaddr, port = setup_server ncores in
  for i = 0 to ncores-1 do
       match Unix.fork() with
      0 -> 
          let d=Unix.gettimeofday() and pid = Unix.getpid() in
          let reschunk=ref opid in
          (* send stdout and stderr to a file to avoid mixing output from different cores *)
	  reopen_out stdout (Printf.sprintf "stdout.%d" i);
	  reopen_out stderr (Printf.sprintf "stderr.%d" i);
          let (ic,oc)=Unix.open_connection sockaddr in
          let finish () = try Unix.shutdown_connection ic with _ -> (); exit 0 in 
	  while true do
            (* ask for work until we are finished *)
	    if debug then Printf.eprintf "Sending Ready token (pid=%d)\n%!" pid;
            Marshal.to_channel oc Ready []; flush oc;
            let token = (Marshal.from_channel ic) in
	    if debug then Printf.eprintf "Received token from master (pid=%d)\n%!" pid;
            match token with
	    | Finished -> (marshal pid fdarr.(i) (!reschunk:'d); finish ())
	    | Task i -> 
		let lo=i*chunksize in
		let hi=if i=ntasks-1 then ln-1 else (i+1)*chunksize-1 in
		(* iterate in reverse order, to accumulate in the right order *)
		for j=0 to (hi-lo) do
		    reschunk := op (f (al.(hi-j))) !reschunk;
		  with e -> 
		      let errmsg = Printexc.to_string e
		      in Printf.eprintf "[Parmap] Error at index j=%d in (%d,%d), chunksize=%d of a total of %d got exception %s on core %d \n"
			j hi lo chunksize (hi-lo+1) errmsg i;
		      Marshal.to_channel oc (Error (i,errmsg)) []; flush oc; finish()
		    Printf.eprintf "[Parmap] Worker on core %d, segment (%d,%d) of data of length %d, chunksize=%d finished in %f seconds\n"
		      i hi lo ln chunksize (Unix.gettimeofday() -. d)
    | -1 ->  Printf.eprintf "[Parmap] Fork error: pid %d; i=%d.\n" (Unix.getpid()) i; 
    | pid -> ()
  (* get all connections from the workers *)
  let wfdl = 
      (fun i -> 
	let (fd,saddr) = 
	  try Unix.accept sock 
	  with e -> (Printf.eprintf "[Parmap] error accepting connection from worker %i\n%!" i; 
		     raise e) 
	in (if debug then Printf.eprintf "[Parmap] accepted connection from worker %d\n%!" i; fd))
      (ext_intv 0 (ncores-1))
  (* feed workers until all tasks are finished *)
  for i=0 to ntasks-1 do
    if debug then Printf.eprintf "Select for task %d\n%!" i;
    let (wfd::_),_,_ = Unix.select wfdl [] [] (-1.)
    in match Marshal.from_channel (Unix.in_channel_of_descr wfd) with
      Ready -> 
	(if debug then Printf.eprintf "Sending task %d\n%!" i;
         let oc = (Unix.out_channel_of_descr wfd) in
	 (Marshal.to_channel oc (Task i) []); flush oc)
    | Error (core,msg) -> (Printf.eprintf "[Parmap]: aborting due to exception on core %d: %s\n" core msg; exit 1)
    (* will need to add some code to properly close down all the channels incrementally *)
  List.iter (fun wfd -> let oc = (Unix.out_channel_of_descr wfd) in (Marshal.to_channel oc Finished []; flush oc; Unix.close wfd)) wfdl;
  (* wait for all childrens to terminate *)
  for i = 0 to ncores-1 do try ignore(Unix.wait()) with Unix.Unix_error (Unix.ECHILD, _, _) -> () done;
  (* read in all data *)
  let res = ref [] in
  (* iterate in reverse order, to accumulate in the right order *)
  for i = 0 to ncores-1 do
      res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
  (* use extLib's tail recursive one *)
  List.fold_right concat !res opid

(* the parallel map function *)

let parmap ?(ncores=1) ?chunksize (f:'a -> 'b) (s:'a sequence) : 'b list=
    parmapfold ~ncores ?chunksize f s (fun v acc -> v::acc) [] (@) 

(* the parallel fold function *)

let parfold ?(ncores=1) ?chunksize (op:'a -> 'b -> 'b) (s:'a sequence) (opid:'b) (concat:'b->'b->'b) : 'b=
    parmapfold ~ncores ?chunksize (fun x -> x) s op opid concat
back to top