We are hiring ! See our job offers.
https://gitorious.org/parmap/parmap.git
Raw File
Tip revision: 802807eeef5560ca1ddb5bebdfaeb31f0854e794 authored by Roberto Di Cosmo on 19 July 2011, 16:57:24 UTC
ParMap using sockets on otherbranch?
Tip revision: 802807e
parmap.ml
(**************************************************************************)
(* ParMap: a simple library to perform Map computations on a multi-core   *)
(*                                                                        *)
(*  Author(s):  Marco Danelutto, Roberto Di Cosmo                         *)
(*                                                                        *)
(*  This library is free software: you can redistribute it and/or modify  *)
(*  it under the terms of the GNU Lesser General Public License as        *)
(*  published by the Free Software Foundation, either version 3 of the    *)
(*  License, or (at your option) any later version.  A special linking    *)
(*  exception to the GNU Lesser General Public License applies to this    *)
(*  library, see the COPYING file for more information.                   *)
(**************************************************************************)

open Common
open Util

(* the parallel map function *)


let parmap (f:'a -> 'b) (l:'a list) ?(ncores=1) : 'b list=
  let t = Timer.create "collection" in
  Timer.enable "collection";
  let tc = Timer.create "computation" in
  Timer.enable "computation";
  let tm = Timer.create "marshalling" in
  Timer.enable "marshalling";
  Timer.start tc;
  (* flush everything *)
  flush stdout; flush stderr;
  (* init task parameters *)
  let ln = List.length l in
  let chunksize = ln/ncores in
  let port=4000 in
  let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
  let sockaddr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
  Unix.setsockopt sock Unix.SO_REUSEADDR true;
  Unix.setsockopt sock Unix.SO_KEEPALIVE true;
  Unix.setsockopt_int sock Unix.SO_SNDBUF 1000000;
  Unix.setsockopt_int sock Unix.SO_RCVBUF 1000000;
  Unix.bind sock sockaddr;
  Unix.listen sock ncores;
  for i = 0 to ncores-1 do
       match Unix.fork() with
      0 -> 
	begin
          let pid = Unix.getpid() in
          let reschunk=ref [] in
          let limit=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
          for j=i*chunksize to limit do
	    try 
              reschunk := (f (List.nth l j))::!reschunk
	    with _ -> (Printf.printf "Error: j=%d\n" j)
          done;
          Printf.eprintf "Process %d done computing\n" pid; flush stderr;
          (* now connect back to the parent, and send the results *) 
          let (ic,oc)=Unix.open_connection sockaddr in
          Marshal.to_channel oc (i,List.rev !reschunk) [Marshal.Closures];
          Printf.eprintf "Process %d has marshaled result\n" pid;
          exit 0
	end
    | -1 ->  Printf.eprintf "Fork error: pid %d; i=%d.\n" (Unix.getpid()) i; 
    | pid -> ()
  done;


  let res = Array.init ncores (fun _ -> []) in

  for i = 0 to ncores-1 do
    let (fd,saddr) = try Unix.accept sock with e -> (Printf.eprintf "Error in accept"; raise e) in (* accepting the connection *)
    let ic=Unix.in_channel_of_descr fd in 
    let (n,v) = Marshal.from_channel ic in
    res.(n)<-v
  done;

  (* wait for all childrens *)
  for i = 0 to ncores-1 do try ignore(Unix.wait()) with Unix.Unix_error (Unix.ECHILD, _, _) -> () done;
  Timer.stop tc ();  Timer.pp_timer Format.std_formatter tc;
  Unix.shutdown sock Unix.SHUTDOWN_ALL;
  Unix.close sock;
  List.flatten (Array.to_list res) 
;;


(* example:
List.iter (fun n -> Printf.printf "%d\n" n) (parmap (fun x -> x+1) [1;2;3;4;5;6;7;8;9;10;11;12;13] ~ncores:4);;
 *)
back to top