Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

swh:1:snp:2d869aa00591d2ac8ec8e7abacdda563d413189d
  • Code
  • Branches (52)
  • Releases (8)
    • Branches
    • Releases
    • HEAD
    • refs/heads/Drup-dune
    • refs/heads/UnixJunkie-patch-1
    • refs/heads/diml-master
    • refs/heads/fastarraymap
    • refs/heads/floatarray
    • refs/heads/gh-pages
    • refs/heads/git-version
    • refs/heads/granularity
    • refs/heads/iterators
    • refs/heads/master
    • refs/heads/merge-requests/1
    • refs/heads/mmap-bigarray-functorised
    • refs/heads/nodispatcher
    • refs/heads/orderpreserving
    • refs/heads/pinning
    • refs/heads/pipes
    • refs/heads/redirect
    • refs/heads/sockets
    • refs/heads/sorted
    • refs/heads/tuning
    • refs/tags/0.9.1
    • refs/tags/0.9.8
    • refs/tags/0.9.9
    • refs/tags/1.0-rc1
    • refs/tags/1.0-rc10
    • refs/tags/1.0-rc11
    • refs/tags/1.0-rc2
    • refs/tags/1.0-rc4
    • refs/tags/1.0-rc5
    • refs/tags/1.0-rc6
    • refs/tags/1.0-rc7-fix-for4.03
    • refs/tags/1.0-rc7-fix-for4.03+1
    • refs/tags/1.0-rc7-fix-for4.03+2
    • refs/tags/1.0-rc7-fix-for4.03+3
    • refs/tags/1.0-rc8
    • refs/tags/1.0-rc9
    • refs/tags/1.0rc3
    • refs/tags/BigArray_MMap_need_fixed_maxsize
    • refs/tags/FixedCornerCases
    • refs/tags/LastVersionWithoutTaskDispatcher
    • refs/tags/LastWithExtLib
    • refs/tags/MajorCodeRework
    • refs/tags/ParMap-Sockets
    • refs/tags/Released-0.9.9
    • refs/tags/SdlMandelsWithFork
    • refs/tags/StableWithoutExtLib
    • refs/tags/Using_Xen_mmap
    • refs/tags/exact_copy_marshal_via_pipe
    • refs/tags/initfinalize-alpha
    • refs/tags/initfinalize-beta
    • refs/tags/list
    • refs/tags/usingpipes
    • OrderPreserving
    • 1.2.3
    • 1.2.2
    • 1.2.1
    • 1.2
    • 1.1.1
    • 1.1
    • 1.0-rc7
  • ea55e9a
  • /
  • parmap.ml
Raw File Download
Permalinks

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
content badge Iframe embedding
swh:1:cnt:101830dbc50ca8dc5ea1c2c6916669aca492a135
directory badge Iframe embedding
swh:1:dir:ea55e9a14de1c4b8812efa2a16c2ba88f8fbcaae
revision badge
swh:1:rev:0b916683ba3d16369f123a594ea0b33f0fa100d5
snapshot badge
swh:1:snp:2d869aa00591d2ac8ec8e7abacdda563d413189d
Citations

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 0b916683ba3d16369f123a594ea0b33f0fa100d5 authored by Roberto Di Cosmo on 15 November 2011, 16:15:03 UTC
Cleanup the code and the documentation, final version.
Tip revision: 0b91668
parmap.ml
(**************************************************************************)
(* ParMap: a simple library to perform Map computations on a multi-core   *)
(*                                                                        *)
(*  Author(s):  Marco Danelutto, Roberto Di Cosmo                         *)
(*                                                                        *)
(*  This library is free software: you can redistribute it and/or modify  *)
(*  it under the terms of the GNU Lesser General Public License as        *)
(*  published by the Free Software Foundation, either version 3 of the    *)
(*  License, or (at your option) any later version.  A special linking    *)
(*  exception to the GNU Lesser General Public License applies to this    *)
(*  library, see the LICENSE file for more information.                   *)
(**************************************************************************)

open ExtLib


(* sequence type, subsuming lists and arrays *)

type 'a sequence = L of 'a list | A of 'a array;;

let debug=true;;

(* utils *)

(* would be [? a | a <- startv--endv] using list comprehension from Batteries *)

let ext_intv startv endv =
  let s,e = (min startv endv),(max startv endv) in
  let rec aux acc = function n -> if n=s then n::acc else aux (n::acc) (n-1)
  in aux [] e
;;

(* find index of the first occurrence of an element in a list *)

let index_of e l =
  let rec aux = function
      ([],_) -> raise Not_found
    | (a::r,n) -> if a=e then n else aux (r,n+1)
  in aux (l,0)
;;

(* freopen emulation, from Xavier's suggestion on OCaml mailing list *)

let reopen_out outchan filename =
  flush outchan;
  let fd1 = Unix.descr_of_out_channel outchan in
  let fd2 =
    Unix.openfile filename [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] 0o666 in
  Unix.dup2 fd2 fd1;
  Unix.close fd2

(* unmarshal from a mmap seen as a bigarray *)
let unmarshal fd =
 let a=Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true (-1) in
 let res=Bytearray.unmarshal a 0 in
 Unix.close fd; 
 res


(* marshal to a mmap seen as a bigarray *)

let marshal fd v = 
  let huge_size = 1 lsl 32 in
  let ba = Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true huge_size in
  ignore(Bytearray.marshal_to_buffer ba 0 v [Marshal.Closures]);
  Unix.close fd

(* create a shadow file descriptor *)

let tempfd () =
  let name = Filename.temp_file "mmap" "TMP" in
  try
    let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
    Unix.unlink name;
    fd
  with e -> Unix.unlink name; raise e

(* a simple mapper function that computes 1/nth of the data on each of the n cores in one iteration *)

let simplemapper ncores compute opid al collect =
  (* flush everything *)
  flush_all();
  (* init task parameters *)
  let ln = Array.length al in
  let chunksize = ln/ncores in
  (* create descriptors to mmap *)
  let fdarr=Array.init ncores (fun _ -> tempfd()) in
  for i = 0 to ncores-1 do
       match Unix.fork() with
      0 -> 
	begin
          let lo=i*chunksize in
          let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
          let exc_handler e j = (* handle an exception at index j *)
	    begin
	      let errmsg = Printexc.to_string e
	      in Printf.eprintf "[Parmap] Error at index j=%d in (%d,%d), chunksize=%d of a total of %d got exception %s on core %d \n%!"
		j lo hi chunksize (hi-lo+1) errmsg i;
	        exit 1
	    end
          in		    
	  let v = compute al lo hi opid exc_handler in
          marshal fdarr.(i) v;
          exit 0
	end
    | -1 ->  Printf.eprintf "Fork error: pid %d; i=%d.\n" (Unix.getpid()) i; 
    | pid -> ()
  done;
  (* wait for all children *)
  for i = 0 to ncores-1 do try ignore(Unix.wait()) with Unix.Unix_error (Unix.ECHILD, _, _) -> () done;
  (* read in all data *)
  let res = ref [] in
  (* iterate in reverse order, to accumulate in the right order *)
  for i = 0 to ncores-1 do
      res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
  done;
  (* collect all results *)
  collect !res
;;


(* a more sophisticated mapper function, with automatic load balancing *)

(* the type of messages exchanged between master and workers *)

type msg_up = Ready of int | Error of int * string;;
type msg_down = Finished | Task of int;;


let setup_children_chans pipeup pipedown fdarr i = 
  Setcore.setcore i;
  (* send stdout and stderr to a file to avoid mixing output from different cores *)
  reopen_out stdout (Printf.sprintf "stdout.%d" i);
  reopen_out stderr (Printf.sprintf "stderr.%d" i);
  (* close the other ends of the pipe and convert my ends to ic/oc *)
  Unix.close (snd pipedown.(i));Unix.close (fst pipeup.(i));
  let pid = Unix.getpid() in
  let ic = Unix.in_channel_of_descr (fst pipedown.(i))
  and oc = Unix.out_channel_of_descr (snd pipeup.(i)) in
  let receive () = Marshal.from_channel ic in
  let signal v = Marshal.to_channel oc v []; flush oc in
  let return v = 
    let d = Unix.gettimeofday() in 
    let _ = marshal fdarr.(i) v in
    if debug then Printf.eprintf "[Parmap]: worker elapsed %f in marshalling\n%!" (Unix.gettimeofday() -. d) in
  let finish () =
    (if debug then Printf.eprintf "Shutting down (pid=%d)\n%!" pid;
     try close_in ic; close_out oc with _ -> ()
    ); exit 0 in 
  receive, signal, return, finish, pid
;;

(* parametric mapper primitive that captures the parallel structure *)

let mapper ncores ~chunksize compute opid al collect =
  let ln = Array.length al in
  match chunksize with 
    None -> simplemapper ncores compute opid al collect (* no need of load balancing *)
  | Some v when ncores=ln/v -> simplemapper ncores compute opid al collect (* no need of load balancing *)
  | Some v -> 
      (* init task parameters *)
      let chunksize = v and ntasks = ln/v in
      (* flush everything *)
      flush_all ();
      (* create descriptors to mmap *)
      let fdarr=Array.init ncores (fun _ -> tempfd()) in
      (* setup communication channel with the workers *)
      let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
      let pipeup=Array.init ncores (fun _ -> Unix.pipe ()) in
      (* spawn children *)
      for i = 0 to ncores-1 do
	match Unix.fork() with
	  0 -> 
	    begin    
              let d=Unix.gettimeofday()  in
              (* primitives for communication *)
              let receive,signal,return,finish,pid = setup_children_chans pipeup pipedown fdarr i in
              let reschunk=ref opid in
              let computetask n = (* compute chunk number n *)
		let lo=n*chunksize in
		let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
		let exc_handler e j = (* handle an exception at index j *)
		  begin
		    let errmsg = Printexc.to_string e
		    in Printf.eprintf "[Parmap]` Error at index j=%d in (%d,%d), chunksize=%d of a total of %d got exception %s on core %d \n%!"
		      j lo hi chunksize (hi-lo+1) errmsg i;
		    signal (Error (i,errmsg)); finish()
		  end
		in		    
		reschunk:= compute al lo hi !reschunk exc_handler;
		Printf.eprintf "[Parmap] Worker on core %d (pid=%d), segment (%d,%d) of data of length %d, chunksize=%d finished in %f seconds\n%!"
		  i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
	      in
	      while true do
		(* ask for work until we are finished *)
		signal (Ready i);
		match receive() with
		| Finished -> return (!reschunk:'d); finish ()
		| Task n -> computetask n
	      done;
	    end
	| -1 ->  Printf.eprintf "[Parmap] Fork error: pid %d; i=%d.\n%!" (Unix.getpid()) i; 
	| pid -> ()
      done;

      (* close unused ends of the pipes *)
      Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown; Array.iter (fun (_,wfd) -> Unix.close wfd) pipeup;

      (* get ic/oc/wfdl *)
      let wfdl = List.map fst (Array.to_list pipeup) in
      let ocs=Array.init ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
      let ics=Array.init ncores (fun n -> Unix.in_channel_of_descr (fst pipeup.(n))) in

      (* feed workers until all tasks are finished *)
      for i=0 to ntasks-1 do
	if debug then Printf.eprintf "Select for task %d (ncores=%d, ntasks=%d)\n%!" i ncores ntasks;
	let readyl,_,_ = Unix.select wfdl [] [] (-1.) in
	let wfd=List.hd readyl in (* List.hd never fails here *)
	let w=index_of wfd wfdl
	in match Marshal.from_channel ics.(w) with
	  Ready w -> 
	    (if debug then Printf.eprintf "Sending task %d to worker %d\n%!" i w;
	     let oc = ocs.(w) in
	     (Marshal.to_channel oc (Task i) []); flush oc)
	| Error (core,msg) -> (Printf.eprintf "[Parmap]: aborting due to exception on core %d: %s\n%!" core msg; exit 1)
      done;

      (* send termination token to all children *)
      Array.iter (fun oc -> Marshal.to_channel oc Finished []; flush oc; close_out oc) ocs;

      (* wait for all children to terminate *)
      for i = 0 to ncores-1 do try ignore(Unix.wait()) with Unix.Unix_error (Unix.ECHILD, _, _) -> () done;

      (* read in all data *)
      let res = ref [] in
      (* iterate in reverse order, to accumulate in the right order *)
      for i = 0 to ncores-1 do
        res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
      done;
      (* collect all results *)
      collect !res
;;


(* the parallel mapfold function *)

let parmapfold ?(ncores=1) ?(chunksize) (f:'a -> 'b) (s:'a sequence) (op:'b->'c->'c) (opid:'c) (concat:'c->'c->'c) : 'c=
  (* enforce array to speed up access to the list elements *)
  let al = match s with A al -> al | L l  -> Array.of_list l in
  let compute al lo hi previous exc_handler =
    (* iterate in reverse order, to accumulate in the right order *)
    let r = ref previous in
    for j=0 to (hi-lo) do
      try 
	r := op (f (Array.unsafe_get al (hi-j))) !r;
      with e -> exc_handler e j
    done; !r
  in
  mapper ncores ~chunksize compute opid al  (fun r -> List.fold_right concat r opid)
;;

(* the parallel map function *)

let parmap ?(ncores=1) ?chunksize (f:'a -> 'b) (s:'a sequence) : 'b list=
  (* enforce array to speed up access to the list elements *)
  let al = match s with A al -> al | L l  -> Array.of_list l in
  let compute al lo hi previous exc_handler =
    (* iterate in reverse order, to accumulate in the right order, and add to acc *)
    let f' j = try f (Array.unsafe_get al (lo+j)) with e -> exc_handler e j in
    let rec aux acc = 
      function
	  0 ->  (f' 0)::acc
	| n ->  aux ((f' n)::acc) (n-1)
    in aux previous (hi-lo)
  in
  mapper ncores ~chunksize compute [] al  (fun r -> ExtLib.List.concat r)
;;

(* the parallel fold function *)

let parfold ?(ncores=1) ?chunksize (op:'a -> 'b -> 'b) (s:'a sequence) (opid:'b) (concat:'b->'b->'b) : 'b=
    parmapfold ~ncores ?chunksize (fun x -> x) s op opid concat
;;


(* the parallel map function, on arrays *)

let map_intv lo hi f a =
  let l = hi-lo in
  if l < 0 then [||] else begin
    let r = Array.create (l+1) (f(Array.unsafe_get a lo)) in
    for i = 1 to l do
      Array.unsafe_set r i (f(Array.unsafe_get a (lo+i)))
    done;
    r
  end

let array_parmap ?(ncores=1) ?chunksize (f:'a -> 'b) (al:'a array) : 'b array=
  let compute a lo hi previous exc_handler =
    try 
      Array.concat [(map_intv lo hi f a);previous]
    with e -> exc_handler e lo
  in
  mapper ncores ~chunksize compute [||] al  (fun r -> Array.concat r)
;;

let array_float_parmap ?(ncores=1) (f:'a -> float) (al:'a array) : float array=
  let size = Array.length al in
  let fd = Unix.openfile "/dev/zero" [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
  let arr_out = Bigarray.Array1.map_file fd Bigarray.float64 Bigarray.c_layout true size in
  let compute _ lo hi _ exc_handler =
    try 
      for i=lo to hi do Bigarray.Array1.unsafe_set arr_out i (f al.(i)) done
    with e -> exc_handler e lo
  in
  simplemapper ncores compute () al (fun r -> ());
  let res = Array.init size (fun i -> Bigarray.Array1.unsafe_get arr_out i) in
  Unix.close fd;
  res
;;  


Software Heritage — Copyright (C) 2015–2025, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Contact— JavaScript license information— Web API

back to top