Skip to main content
  • Home
  • Development
  • Documentation
  • Donate
  • Operational login
  • Browse the archive

swh logo
SoftwareHeritage
Software
Heritage
Archive
Features
  • Search

  • Downloads

  • Save code now

  • Add forge now

  • Help

https://gitorious.org/parmap/parmap.git
30 March 2016, 07:40:04 UTC
  • Code
  • Branches (29)
  • Releases (0)
  • Visits
    • Branches
    • Releases
    • HEAD
    • refs/heads/floatarray
    • refs/heads/granularity
    • refs/heads/iterators
    • refs/heads/master
    • refs/heads/mmap-bigarray-functorised
    • refs/heads/nodispatcher
    • refs/heads/pinning
    • refs/heads/pipes
    • refs/heads/redirect
    • refs/heads/sockets
    • refs/merge-requests/1
    • refs/merge-requests/2
    • refs/tags/0.9.1
    • refs/tags/0.9.8
    • refs/tags/0.9.9
    • refs/tags/BigArray_MMap_need_fixed_maxsize
    • refs/tags/FixedCornerCases
    • refs/tags/LastVersionWithoutTaskDispatcher
    • refs/tags/LastWithExtLib
    • refs/tags/MajorCodeRework
    • refs/tags/ParMap-Sockets
    • refs/tags/Released-0.9.9
    • refs/tags/SdlMandelsWithFork
    • refs/tags/StableWithoutExtLib
    • refs/tags/Using_Xen_mmap
    • refs/tags/exact_copy_marshal_via_pipe
    • refs/tags/list
    • refs/tags/usingpipes
    • 0064fbd0ad69de205ea6ec6999f3d3895e9442c2
    No releases to show
  • 5512fa7
  • /
  • parmap.ml
Raw File Download Save again
Take a new snapshot of a software origin

If the archived software origin currently browsed is not synchronized with its upstream version (for instance when new commits have been issued), you can explicitly request Software Heritage to take a new snapshot of it.

Use the form below to proceed. Once a request has been submitted and accepted, it will be processed as soon as possible. You can then check its processing state by visiting this dedicated page.
swh spinner

Processing "take a new snapshot" request ...

To reference or cite the objects present in the Software Heritage archive, permalinks based on SoftWare Hash IDentifiers (SWHIDs) must be used.
Select below a type of object currently browsed in order to display its associated SWHID and permalink.

  • content
  • directory
  • revision
  • snapshot
origin badgecontent badge
swh:1:cnt:d5214ff9562a1fe78db51944506ba48c20de3379
origin badgedirectory badge
swh:1:dir:5512fa77668338bdb6f673c32e15a81615fe5c68
origin badgerevision badge
swh:1:rev:0064fbd0ad69de205ea6ec6999f3d3895e9442c2
origin badgesnapshot badge
swh:1:snp:78209702559384ee1b5586df13eca84a5123aa82

This interface enables to generate software citations, provided that the root directory of browsed objects contains a citation.cff or codemeta.json file.
Select below a type of object currently browsed in order to generate citations for them.

  • content
  • directory
  • revision
  • snapshot
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Generate software citation in BibTex format (requires biblatex-software package)
Generating citation ...
Tip revision: 0064fbd0ad69de205ea6ec6999f3d3895e9442c2 authored by Roberto Di Cosmo on 10 January 2012, 20:40:45 UTC
Added Makefile for OCaml 3.11
Tip revision: 0064fbd
parmap.ml
(**************************************************************************)
(* ParMap: a simple library to perform Map computations on a multi-core   *)
(*                                                                        *)
(*  Author(s):  Marco Danelutto, Roberto Di Cosmo                         *)
(*                                                                        *)
(*  This library is free software: you can redistribute it and/or modify  *)
(*  it under the terms of the GNU Lesser General Public License as        *)
(*  published by the Free Software Foundation, either version 2 of the    *)
(*  License, or (at your option) any later version.  A special linking    *)
(*  exception to the GNU Lesser General Public License applies to this    *)
(*  library, see the LICENSE file for more information.                   *)
(**************************************************************************)

open ExtLib

(* OS related constants *)

(* a reasonable size for mmapping a file containing even huge result data *)
let huge_size = if Sys.word_size = 64 then 1 lsl 32 else 1 lsl 26;;

(* sequence type, subsuming lists and arrays *)

type 'a sequence = L of 'a list | A of 'a array;;

let debug_enabled = ref false;;

let log_dir = ref (Printf.sprintf "/tmp/.parmap.%d" (Unix.getpid ()))

let debug fmt =
  Printf.kprintf (
    if !debug_enabled then begin
      (fun s -> Format.eprintf "[Parmap]: %s@." s)
    end else ignore
  ) fmt

let info fmt =
  Printf.kprintf (fun s -> Format.eprintf "[Parmap]: %s@." s) fmt

(* utils *)

(* would be [? a | a <- startv--endv] using list comprehension from Batteries *)

let ext_intv startv endv =
  let s,e = (min startv endv),(max startv endv) in
  let rec aux acc = function n -> if n=s then n::acc else aux (n::acc) (n-1)
  in aux [] e
;;

(* freopen emulation, from Xavier's suggestion on OCaml mailing list *)

let reopen_out outchan fname =
  flush outchan;
  if not(Sys.file_exists !log_dir) then Unix.mkdir !log_dir 0o777 ;
  let filename = Filename.concat !log_dir fname in
  let fd1 = Unix.descr_of_out_channel outchan in
  let fd2 = Unix.openfile filename [Unix.O_WRONLY; Unix.O_CREAT; Unix.O_TRUNC] 0o666 in
  Unix.dup2 fd2 fd1;
  Unix.close fd2

(* unmarshal from a mmap seen as a bigarray *)
let unmarshal fd =
 let a = Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true (-1) in
 let res = Bytearray.unmarshal a 0 in
 Unix.close fd; 
 res


(* marshal to a mmap seen as a bigarray *)

(* System dependent notes:
    - on Linux kernels, we might allocate a mmapped memory area of huge_size and marshal into it directly

       let ba = Bigarray.Array1.map_file fd Bigarray.char Bigarray.c_layout true huge_size in
       ignore(Bytearray.marshal_to_buffer ba 0 v [Marshal.Closures]);
       Unix.close fd

    - to be compatible with other systems, notably Mac OS X, which insist in allocating *all*
      the declared memory area even for a sparse file, we must choose a less efficient approach:  
       * marshal the value v to a string s, and compute its size
       * allocate a mmap of that exact size,
       * copy the string to that mmap
      this allocates twice as much memory, and incurs an extra copy of the value v 
 *)

let marshal fd v = 
  let s = Marshal.to_string v [Marshal.Closures] in
  ignore(Bytearray.mmap_of_string fd s)

(* create a shadow file descriptor *)

let tempfd () =
  let name = Filename.temp_file "mmap" "TMP" in
  try
    let fd = Unix.openfile name [Unix.O_RDWR; Unix.O_CREAT] 0o600 in
    Unix.unlink name;
    fd
  with e -> Unix.unlink name; raise e

(* a simple mapper function that computes 1/nth of the data on each of the n cores in one iteration *)

let simplemapper ncores compute opid al collect =
  (* flush everything *)
  flush_all();
  (* init task parameters *)
  let ln = Array.length al in
  let chunksize = ln/ncores in
  (* create descriptors to mmap *)
  let fdarr=Array.init ncores (fun _ -> tempfd()) in
  (* call the GC before forking *)
  Gc.compact ();
  (* spawn children *)
  for i = 0 to ncores-1 do
    match Unix.fork() with
      0 -> 
	begin
          let lo=i*chunksize in
          let hi=if i=ncores-1 then ln-1 else (i+1)*chunksize-1 in
          let exc_handler e j = (* handle an exception at index j *)
	    info "error at index j=%d in (%d,%d), chunksize=%d of a total of %d got exception %s on core %d \n%!"
	      j lo hi chunksize (hi-lo+1) (Printexc.to_string e) i;
	    exit 1
          in		    
	  let v = compute al lo hi opid exc_handler in
          marshal fdarr.(i) v;
          exit 0
	end
    | -1 -> info "fork error: pid %d; i=%d" (Unix.getpid()) i; 
    | pid -> ()
  done;
  (* wait for all children *)
  for i = 0 to ncores-1 do 
    try ignore(Unix.wait()) 
    with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
  done;
  (* read in all data *)
  let res = ref [] in
  (* iterate in reverse order, to accumulate in the right order *)
  for i = 0 to ncores-1 do
      res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
  done;
  (* collect all results *)
  collect !res
;;


(* a more sophisticated mapper function, with automatic load balancing *)

(* the type of messages exchanged between master and workers *)

type msg_to_master = Ready of int | Error of int * string;;
type msg_to_worker = Finished | Task of int;;


let setup_children_chans oc pipedown fdarr i = 
  Setcore.setcore i;
  (* send stdout and stderr to a file to avoid mixing output from different cores *)
  reopen_out stdout (Printf.sprintf "stdout.%d" i);
  reopen_out stderr (Printf.sprintf "stderr.%d" i);
  (* close the other ends of the pipe and convert my ends to ic/oc *)
  Unix.close (snd pipedown.(i));
  let pid = Unix.getpid() in
  let ic = Unix.in_channel_of_descr (fst pipedown.(i)) in
  let receive () = Marshal.from_channel ic in
  let signal v = Marshal.to_channel oc v []; flush oc in
  let return v = 
    let d = Unix.gettimeofday() in 
    let _ = marshal fdarr.(i) v in
    debug "worker elapsed %f in marshalling" (Unix.gettimeofday() -. d) in
  let finish () =
    (debug "shutting down (pid=%d)\n%!" pid;
     try close_in ic; close_out oc with _ -> ()
    ); exit 0 in 
  receive, signal, return, finish, pid
;;

(* parametric mapper primitive that captures the parallel structure *)

let mapper ncores ~chunksize compute opid al collect =
  let ln = Array.length al in
  match chunksize with 
    None -> simplemapper ncores compute opid al collect (* no need of load balancing *)
  | Some v when ncores=ln/v -> simplemapper ncores compute opid al collect (* no need of load balancing *)
  | Some v -> 
      (* init task parameters *)
      let chunksize = v and ntasks = ln/v in
      (* flush everything *)
      flush_all ();
      (* create descriptors to mmap *)
      let fdarr=Array.init ncores (fun _ -> tempfd()) in
      (* setup communication channel with the workers *)
      let pipedown=Array.init ncores (fun _ -> Unix.pipe ()) in
      let pipeup_rd,pipeup_wr=Unix.pipe () in
      let oc_up = Unix.out_channel_of_descr pipeup_wr in
      (* call the GC before forking *)
      Gc.compact ();
      (* spawn children *)
      for i = 0 to ncores-1 do
	match Unix.fork() with
	  0 -> 
	    begin    
              let d=Unix.gettimeofday()  in
              (* primitives for communication *)
              Unix.close pipeup_rd;
              let receive,signal,return,finish,pid = setup_children_chans oc_up pipedown fdarr i in
              let reschunk=ref opid in
              let computetask n = (* compute chunk number n *)
		let lo=n*chunksize in
		let hi=if n=ntasks-1 then ln-1 else (n+1)*chunksize-1 in
		let exc_handler e j = (* handle an exception at index j *)
		  begin
		    let errmsg = Printexc.to_string e
		    in info "error at index j=%d in (%d,%d), chunksize=%d of a total of %d got exception %s on core %d \n%!"
		      j lo hi chunksize (hi-lo+1) errmsg i;
		    signal (Error (i,errmsg)); finish()
		  end
		in		    
		reschunk:= compute al lo hi !reschunk exc_handler;
		info "worker on core %d (pid=%d), segment (%d,%d) of data of length %d, chunksize=%d finished in %f seconds"
		  i pid lo hi ln chunksize (Unix.gettimeofday() -. d)
	      in
	      while true do
		(* ask for work until we are finished *)
		signal (Ready i);
		match receive() with
		| Finished -> return (!reschunk:'d); finish ()
		| Task n -> computetask n
	      done;
	    end
	| -1 ->  info "fork error: pid %d; i=%d" (Unix.getpid()) i; 
	| pid -> ()
      done;

      (* close unused ends of the pipes *)
      Array.iter (fun (rfd,_) -> Unix.close rfd) pipedown;
      Unix.close pipeup_wr;

      (* get ic/oc/wfdl *)
      let ocs=Array.init ncores (fun n -> Unix.out_channel_of_descr (snd pipedown.(n))) in
      let ic=Unix.in_channel_of_descr pipeup_rd in

      (* feed workers until all tasks are finished *)
      for i=0 to ntasks-1 do
	match Marshal.from_channel ic with
	  Ready w -> 
	    (debug "sending task %d to worker %d" i w;
	     let oc = ocs.(w) in
	     (Marshal.to_channel oc (Task i) []); flush oc)
	| Error (core,msg) -> (info "aborting due to exception on core %d: %s" core msg; exit 1)
      done;

      (* send termination token to all children *)
      Array.iter (fun oc -> 
	Marshal.to_channel oc Finished []; 
        flush oc; 
        close_out oc
      ) ocs;

      (* wait for all children to terminate *)
      for i = 0 to ncores-1 do 
	try ignore(Unix.wait()) 
	with Unix.Unix_error (Unix.ECHILD, _, _) -> ()
      done;

      (* read in all data *)
      let res = ref [] in
      (* iterate in reverse order, to accumulate in the right order *)
      for i = 0 to ncores-1 do
        res:= ((unmarshal fdarr.((ncores-1)-i)):'d)::!res;
      done;
      (* collect all results *)
      collect !res
;;

(* the parallel mapfold function *)

let parmapfold ?(ncores=1) ?(chunksize) (f:'a -> 'b) (s:'a sequence) (op:'b->'c->'c) (opid:'c) (concat:'c->'c->'c) : 'c=
  (* enforce array to speed up access to the list elements *)
  let al = match s with A al -> al | L l  -> Array.of_list l in
  let compute al lo hi previous exc_handler =
    (* iterate in reverse order, to accumulate in the right order *)
    let r = ref previous in
    for j=0 to (hi-lo) do
      try 
	r := op (f (Array.unsafe_get al (hi-j))) !r;
      with e -> exc_handler e j
    done; !r
  in
  mapper ncores ~chunksize compute opid al  (fun r -> List.fold_right concat r opid)
;;

(* the parallel map function *)

let parmap ?(ncores=1) ?chunksize (f:'a -> 'b) (s:'a sequence) : 'b list=
  (* enforce array to speed up access to the list elements *)
  let al = match s with A al -> al | L l  -> Array.of_list l in
  let compute al lo hi previous exc_handler =
    (* iterate in reverse order, to accumulate in the right order, and add to acc *)
    let f' j = try f (Array.unsafe_get al (lo+j)) with e -> exc_handler e j in
    let rec aux acc = 
      function
	  0 ->  (f' 0)::acc
	| n ->  aux ((f' n)::acc) (n-1)
    in aux previous (hi-lo)
  in
  mapper ncores ~chunksize compute [] al  (fun r -> ExtLib.List.concat r)
;;

(* the parallel fold function *)

let parfold ?(ncores=1) ?chunksize (op:'a -> 'b -> 'b) (s:'a sequence) (opid:'b) (concat:'b->'b->'b) : 'b=
    parmapfold ~ncores ?chunksize (fun x -> x) s op opid concat
;;


(* the parallel map function, on arrays *)

let map_intv lo hi f a =
  let l = hi-lo in
  if l < 0 then [||] else begin
    let r = Array.create (l+1) (f(Array.unsafe_get a lo)) in
    for i = 1 to l do
      Array.unsafe_set r i (f(Array.unsafe_get a (lo+i)))
    done;
    r
  end

let array_parmap ?(ncores=1) ?chunksize (f:'a -> 'b) (al:'a array) : 'b array=
  let compute a lo hi previous exc_handler =
    try 
      Array.concat [(map_intv lo hi f a);previous]
    with e -> exc_handler e lo
  in
  mapper ncores ~chunksize compute [||] al  (fun r -> Array.concat r)
;;


(* This code is highly optimised for operations on float arrays:

   - knowing in advance the size of the result allows to
     pre-allocate it in a shared memory space as a Bigarray;

   - to write in the Bigarray memory area using the unsafe
     functions for Arrays, we trick the OCaml compiler into
     using the Bigarray memory as an Array as follows

       Array.unsafe_get (Obj.magic arr_out) 1

     This works because OCaml compiles access to float arrays
     as unboxed data, without further integrity checks;

   - the final copy into a real OCaml array is done via a memcpy in C.

     This approach gives a performance which is 2 to 3 times higher
     w.r.t. array_parmap, at the price of using Obj.magic and 
     knowledge on the internal representation of arrays and bigarrays.
 *)

exception WrongArraySize

type buf= (float, Bigarray.float64_elt, Bigarray.c_layout) Bigarray.Array1.t * int;; (* should be a long int some day *)

let init_shared_buffer a = 
  let size = Array.length a in
  let fd = tempfd() in
  let arr = Bigarray.Array1.map_file fd Bigarray.float64 Bigarray.c_layout true size in
  
  (* The mmap() function shall add an extra reference to the file associated
     with the file descriptor fildes which is not removed by a subsequent close()
     on that file descriptor.
     http://pubs.opengroup.org/onlinepubs/009695399/functions/mmap.html
   *)
  Unix.close fd; (arr,size)
;;

let array_float_parmap ?(ncores=1) ?chunksize ?result ?sharedbuffer (f:'a -> float) (al:'a array) : float array =
  let size = Array.length al in
  let barr_out = 
    match sharedbuffer with
      Some (arr,s) -> 
	if s<size then 
	  (info "shared buffer is too small to hold the input in array_float_parmap"; raise WrongArraySize)
	else arr
    | None -> fst (init_shared_buffer al)
  in
  (* trick the compiler into accessing the Bigarray memory area as a float array:
     the data in Bigarray is placed at offset 1 w.r.t. a normal array, so we
     get a pointer to that zone into arr_out_as_array, and have it typed as a float
     array *)
  let barr_out_as_array = Array.unsafe_get (Obj.magic barr_out) 1 in
  let compute _ lo hi _ exc_handler =
    try
      for i=lo to hi do 
	Array.unsafe_set barr_out_as_array i (f (Array.unsafe_get al i)) 
      done
    with e -> exc_handler e lo
  in
  mapper ncores ~chunksize compute () al (fun r -> ());
  let res = 
    match result with
      None -> Bytearray.to_floatarray barr_out size
    | Some a -> 
	if Array.length a < size then
	  (info "result array is too small to hold the result in array_float_parmap"; raise WrongArraySize)
        else
	  Bytearray.to_this_floatarray a barr_out size
  in res
;;  

back to top

Software Heritage — Copyright (C) 2015–2026, The Software Heritage developers. License: GNU AGPLv3+.
The source code of Software Heritage itself is available on our development forge.
The source code files archived by Software Heritage are available under their own copyright and licenses.
Terms of use: Archive access, API— Content policy— Contact— JavaScript license information— Web API