swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9
Raw File
Tip revision: ad58ae426e7e9200530d18bf439d02657503426c authored by fclement on 23 November 2010, 11:33:06 UTC
Ignore all generated files.
Tip revision: ad58ae4
commlib.ml
(***********************************************************************)
(*                                                                     *)
(*                          OCamlP3l                                   *)
(*                                                                     *)
(* (C) 2004-2007                                                       *)
(*             Roberto Di Cosmo (dicosmo@dicosmo.org)                  *)
(*             Zheng Li (zli@lip6.fr)                                  *)
(*             Pierre Weis (Pierre.Weis@inria.fr)                      *)
(*             Francois Clement (Francois.Clement@inria.fr)            *)
(*                                                                     *)
(* Based on original Ocaml P3L System                                  *)
(* (C) 1997 by                                                         *)
(*             Roberto Di Cosmo (dicosmo@ens.fr)                       *)
(*             Marco Danelutto (marcod@di.unipi.it)                    *)
(*             Xavier Leroy  (Xavier.Leroy@inria.fr)                   *)
(*             Susanna Pelagatti (susanna@di.unipi.it)                 *)
(*                                                                     *)
(* This program is free software; you can redistribute it and/or       *)
(* modify it under the terms of the GNU Library General Public License *)
(* as published by the Free Software Foundation; either version 2      *)
(* of the License, or (at your option) any later version.              *)
(*                                                                     *)
(* This program is distributed in the hope that it will be useful,     *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of      *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the       *)
(* GNU Library General Public License for more details.                *)
(*                                                                     *)
(***********************************************************************)

(* $Id: commlib.ml,v 1.22 2007-01-23 15:50:04 weis Exp $ *)

(* 

   OcamlP3L Communication Library.

   Here we abstract away details of the
   low level architecture for communication among nodes

   This means that we provide an abstract send and receive set
   of operations with or without closures...

   Most notably, we also try to implement a notion of
   "communicator" like in MPI to be able to selectively
   input data on certain given channels only. On unix
   sockets, this is just a set of addresses/ports to be given to
   a select operation...

*)
open Printf
open Unix
open Basedefs
open Server (* this module actually depends on the model, which is chosen at compilation time via
               OcamlP4 preprocessing *)

(* reexport debug, defining the mask optional value ! *)

let debug=debug;;
let context=context;;
let timedcontext=timedcontext;;

(*** ABSTRACTIONS FOR PROCESSOR IDS
     PHYLOSOPHY: this layer should be independent on 
                 the actual communication layer used: sockets, PVM, MPI etc.

     + each communication channel should contain information about the
       virtual processor it is connected to, besides the data structures
       necessary to perform the actual communication

     + this vp info is used to properly select the communicators (=sets of vps)
       used during each operation, and eventually will be used to support
       broadcast, multicast, scatter and gather operations like in MPI

 ***)


type vpinfo = int;;
type vp_chan  = VpCh  of vpinfo * Unix.file_descr * Unix.sockaddr;;
type vp_extern = VpX of vpinfo * Unix.sockaddr;;

(* close on exec for vp_chan *)
let ch_set_close_on_exec (VpCh (_,fd,_)) = Unix.set_close_on_exec fd;;

let dummy_vpinfo = -1;;
let dummy_sockaddr = ADDR_UNIX("fake_sockaddr");;
let dummy_vpch  = VpCh (dummy_vpinfo,Unix.stdin,dummy_sockaddr);;


(* this node's information *)

let setmyvpinfo, myvpinfo =
  let (get,set) = Server.mklocalref (None: vpinfo option) in
  ((fun n -> set (Some n)),
   (fun () ->
     match (get ())
     with Some n -> n
     | _ -> (printf "%t: Commlib.myvpinfo: vpinfo not initialised for this node!" timedcontext; failwith "Internal error")
   ))
;;

let find_ch_of_vpinfo i =
  let rec aux = 
    function
	[] -> failwith ("Commlib.find_ch_of_vpinfo: Channel for node "^(string_of_int i)^" not found! Aborting")
      | (VpCh(n,_,_) as ch)::rest -> if i=n then ch else aux rest
  in fun chl -> aux chl;; 

(*** Channel/vp management: creation of input port, connection to output port,
     closing channels ***)

(* give me back my node identifier, which contains the initialized receive socket *)

(* a routine to get a free address on the local machine *)
let rec get_free_addr () =
   let thisaddr = (match !Basedefs.ip with
                   Some ip -> debug (fun () -> printf "%t: forcing ip address to %s\n" context ip);
                              Unix.inet_addr_of_string ip
                 | None -> (let myip = (gethostbyname (gethostname())).h_addr_list.(0)
                            in debug (fun () -> printf "%t: forcing ip address to %s\n" context (Unix.string_of_inet_addr myip));myip))
                           
  (* DO NOT USE Unix.inet_addr_any HERE: this address will be SENT 
     to other processes! 
     TODO: handle all error cases, and we should try localhost also!!!
  *)
  in
     let sockin = Unix.socket PF_INET SOCK_STREAM 0 in
(* 
     Unix.tcpsetsockopt sockin TCP_NODELAY;  (* this needs an extension to Ocaml's unix library which is not there yet *)
 *)
     let inch = ADDR_INET(thisaddr,0) in (* passing 0 as port gives us back the first unpriviledged port free, thanx Xavier *)
     try
       Unix.bind sockin inch; (sockin,Unix.getsockname sockin)
     with _ -> Unix.close sockin; failwith "get_free_addr: ports exhausted";;

(* Make the pattern-matching analyser happy *)

let getport = 
  function 
      (ADDR_INET(_,p)) -> p
    |  _ -> failwith "Commlib: invalid argument to getport";;

let get_addr_port = 
  function 
      (ADDR_INET(a,p)) -> (a,p)
    |  _ -> failwith "Commlib: invalid argument to getport";;

let mkvp n fanin = let (sockin,inch) = get_free_addr() in
             (* WARNING: the size of the queue is sensible: we can loose packets if we are slow in accepting,     *)
             (* it appears that the sender is not able to receive the necessary ECONNREFUSED if the queue is full *)
             let queuelength = (max 3 fanin) in
	     debug (fun () -> printf "%t: listening on socket with queue length %d" timedcontext queuelength);
             Unix.listen sockin queuelength;
              (* debug here! *)
              let p = (getport inch) in 
              debug (fun () -> printf "%t: input on port %d" context p); 
             VpCh(n,sockin,inch);;

(* select the data I can send on a channel to other processors *)

let vp_extern (VpCh(n,sockin,inch)) = VpX(n,inch);;

let vp_extern_print = 
  function
    VpX(n,(ADDR_INET(ia, p))) -> (printf "vp[%d] = %s/%d  " n (string_of_inet_addr ia) p)
  | _ -> failwith "Commlib: invalida rgument to vp_extern_print";;

let vp_info (VpCh(n,sockin,inch)) = n;;

(* debug out channel *)

let name_of_descr ?(peer=true) fd =
  try
    let n = (if peer then Unix.getpeername else Unix.getsockname) fd in
    match n with
      ADDR_INET(ip,port) -> (string_of_inet_addr ip)^":"^(string_of_int port)
    | ADDR_UNIX(s) -> s
  with e -> (debug (fun () -> printf "%t: exception %s in name_of_descr" timedcontext (Printexc.to_string e)); "fake")
;;

let addr_of_vpch ?(peer=true) (VpCh(i,fd,_)) = name_of_descr ~peer:peer fd;;

(* Close a wrapped channel *)

let vp_close (VpCh(i,fd,_)) =
  debug(fun () -> printf "%t: closing channel from [%d] to [%d] %s -> %s.\n" timedcontext (myvpinfo()) i (name_of_descr ~peer:false fd) (name_of_descr fd));
  try
    Unix.close fd (* debug (fun () -> printf "%t: NOT CLOSING FOR DEBUGGING, RESTORE!" timedcontext) *)
  with _ -> failwith "Error on closing channel";;

(*** SEND AND RECEIVE FUNCTIONS ***)

let really_write fd buf pos len = (* do not check buf boundaries *)
  let n = ref len and p = ref pos in
  while !n>0 do
    let r = Unix.write fd buf !p !n in
    debug (fun () -> printf "%t: asked output for %d data, really wrote %d bytes" context !n r);
    p:= !p+r; n:= !n-r
  done
;;

let unsafe_really_read fd buf pos len = (* do not check buf boundaries *)
  let n = ref len and p = ref pos in
  while !n>0 do
    (* the real work *)
    let r = Unix.read fd buf !p !n in
    debug ~mask:128 (fun () -> printf "%t: asked for %d data, really read %d bytes" context !n r);
    if r = 0
    then (printf "%t: eof on channel %s -> %s" timedcontext (name_of_descr fd) (name_of_descr ~peer:false fd); raise End_of_file); (* we got EOF *)
    p:= !p+r; n:= !n-r
  done
;;

let flag_to_string =
  function
      Marshal.Closures -> " Closures"
    | Marshal.No_sharing -> "No_sharing";;

let flags_to_string fl = List.fold_left (fun s -> fun f -> s^(flag_to_string f)) "" fl;;

let gen_cl_send (VpCh(n,fd,_)) data flags =
  debug(fun () -> printf "%t: sending data with flags %s to [%d] on channel %s -> %s.\n" timedcontext (flags_to_string flags) n (name_of_descr ~peer:false fd) (name_of_descr fd));
  let data = Marshal.to_string data flags in really_write fd data 0 (String.length data);
  debug(fun () -> printf "%t: sent data to [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr ~peer:false fd) (name_of_descr fd))
;;


(* with closures *)
let cl_send ch data = gen_cl_send ch data [Marshal.Closures];;

(* without closures *)
let nocl_send ch data = gen_cl_send ch data [];;

let bsize=Marshal.header_size+128;;
let buf = ref (String.create bsize);;
let marshal_read ic =
  unsafe_really_read ic !buf 0 Marshal.header_size;
  let data_size = Marshal.data_size !buf 0 in
  let total_size = (Marshal.header_size+data_size) in 
  let buf = if String.length !buf < total_size
  	    then let bigger=ref (String.create total_size)
  	    in String.blit !buf 0 !bigger 0 Marshal.header_size; bigger
  	    else buf in
  unsafe_really_read ic !buf Marshal.header_size data_size;
  (Marshal.from_string !buf 0);;

(* just one receive function here *)
let receive (VpCh(n,fd,_)) =
  debug(fun () -> printf "%t: reading data from [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr fd) (name_of_descr ~peer:false fd));
  let data = marshal_read fd in
  debug(fun () -> printf "%t: read data from [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr fd) (name_of_descr ~peer:false fd));
  data;;

(* communicators... *)

(* receiving data from a group of channels *)
(* Notice: in_channel, out_channel are _buffered_ in Ocaml, so we cannot just
   do a receive ic if we are alternating reads from a set of channels, as this
   requires to use select to find a ready channel, while we have no way to know
   if some additional data is already available in the input buffer of one of the channels (thanx, Jerome!)
   Hence, we need to implement our own buffer.
 *)

let receive_any icl =
  let fdl = List.map (fun (VpCh(_,fd,_)) -> fd) icl in
    debug(fun () -> printf "%t: selecting..." context; print_newline());
    let (ready, _, exc) = Unix.select fdl [] fdl (-1.0) in
    debug(fun () -> printf "%t: selected..." context; print_newline());
    if (List.length exc) <> 0 then
      begin
	printf "%t: exceptional conditions found on an input channel ..." timedcontext;
	if (List.length ready) = 0 then failwith "no ready channels, exiting"
      end;
    let rec fd_assoc fd =
      function
	  [] -> failwith "Commlib.receive_any: no ready descriptor"
	| (VpCh(_,fd',_) as vpch ) :: rest -> if fd = fd' then vpch else fd_assoc fd rest in
    let fd = List.hd ready in (marshal_read fd),(fd_assoc fd icl)
;;


(* reexport establish_smart_server, converting it to our vp representation *)

let establish_smart_server f = 
    let f' fd sockaddr = f (VpCh(dummy_vpinfo,fd,sockaddr))
    in establish_smart_server f';;

(** channel/network utility routines *)

(* connections at the fd level *)

let fd_open_connection sockaddr =
  let domain =
    match sockaddr with ADDR_UNIX _ -> PF_UNIX | ADDR_INET(_,_) -> PF_INET in
  let sock =
    socket domain SOCK_STREAM 0 in
  try
    connect sock sockaddr;
    sock
  with exn ->
    close sock; raise exn
;;

(* get an i/o connection to addr *)
let get_connection (addr,port) =
  let rec try_connect n =
    if n > 0 then begin
      try
        fd_open_connection (ADDR_INET(addr,port))
      with Unix_error(ECONNREFUSED, _, _) ->
        sleep 1; try_connect (n-1)
    end else begin
      debug (fun () -> printf "%t: could not connect to %s on port %d" context (string_of_inet_addr addr) p3lport);
      raise (Unix_error(ECONNREFUSED, "get_connection", ""))
    end
  in try_connect 10

(* accept a connection on a socket *)

let doaccept s = 
   debug (fun () -> printf "%t: accepting" context);
   let s' = Unix.accept s in
   match s' with (_,ADDR_INET(ia,p)) -> 
     (debug (fun () -> printf "%t: accepted on port %d from %s" context p (string_of_inet_addr ia));
   s')
   | _ -> failwith "Commlib: invalid argument to doaccept";;

(* accept n incoming connections *)
let rec do_accepts i sockin dbgstring =
  if i <= 0 then [] else 
  begin
    let (c, unx_addr) = 
      debug (fun () ->printf "PID %d: do_accept %s ACCEPTING" (Unix.getpid()) dbgstring);
      Unix.accept sockin 
    in
    match unx_addr with 
      Unix.ADDR_INET(_,pn) -> (debug (fun () -> printf "PID %d: do_accept ACCEPTED port num %d " (Unix.getpid()) pn);
                               (c :: do_accepts (i-1) sockin dbgstring))
    | _ -> failwith "Commlib: invalid argument to do_accepts"
  end;;

(* get the internet address, and eventually port number, of a virtual processor from its string description
   (can be IP number or symbolic name) *)

let proc_inet_addr proc =
   let (proc,cap) =
      try 
	let cappos=(String.index proc '%') in
	 (String.sub proc 0 cappos,
	 int_of_string(String.sub proc (cappos+1) ((String.length proc)-cappos-1)))
      with _ ->(proc,-1) 
	(* Note that xxxx:xx#x$1 and xxxx:xx#x have the same meaning in strict mapping which is capability=1,
	   but different meanings in fuzzy mapping, which are capability=1 and "don't care". So we have to 
	   set -1 as "not specified" and 1 as "capability=1" *)
   in let (proc,col) =
      try 
        let colpos=(String.index proc '#') in
	(String.sub proc 0 colpos, 
	 int_of_string(String.sub proc (colpos+1) ((String.length proc)-colpos-1)))
      with _ -> (proc,0)
   in let (proc,port) = 
       try 
	 let colpos= (String.index proc ':') in
	   (String.sub proc 0 colpos,
	    int_of_string(String.sub proc (colpos+1) ((String.length proc)-colpos-1)))
       with _ -> (proc,p3lport)
   in let proc =
       try 
	 inet_addr_of_string proc
       with Failure("inet_addr_of_string") ->
	 try
	   let adr = (gethostbyname proc).h_addr_list.(0) in
	   adr
	 with Not_found -> 
	   failwith ("Commlib : proc_inet_addr : Unknown host "^proc)
   in ((proc,port),col,cap);;


(*********** HIGHER LEVEL FUNCTIONS ****************)

(* get a vpchan structure, with initialization, out of vpinfo and ip address *)

let vpchan_of_addr (n,((ia,p) as addr)) = (n,VpCh(n,get_connection addr,ADDR_INET(ia,p)));;

(* a list of (output) channels out of a list of vpinfos *)
let outchs_of_vps vpchl myvpdata= 
  List.map 
    (fun (VpX(n,outaddr)) -> 
      let (ia,p) = get_addr_port outaddr in
      debug (fun () -> printf "%t: connecting a channel to %s port %d\n" 
	  context (string_of_inet_addr ia) p); 
      let sout = get_connection (ia,p) in
(* (* this needs an extension to Ocaml's unix library which is not there yet *)
         Unix.tcpsetsockopt sout TCP_NODELAY;
 *)
      debug (fun () -> printf "%t: connected a channel to %s port %d\n" 
	  context (string_of_inet_addr ia) p);
     (*** here we need to send our vpinfo to the other node!!!! *)
      debug (fun () -> printf "%t: Sending vpinfo %d to output node..." timedcontext (vp_info myvpdata));
      let vpch = VpCh(n,sout,ADDR_INET(ia,p))
      in cl_send vpch (vp_info myvpdata); vpch)
    vpchl;;

let setup_receive_chan myvpchan =
    let VpCh(_,fd,_) = myvpchan in
    let (ufd,saddr) = doaccept fd in 
    (*** here we need to receive the peer's vpinfo!!! ***)
    debug (fun () -> printf "%t: Receiving vpinfo from input node..." timedcontext);
    let peervpinfo = marshal_read ufd
    in 
      debug (fun () -> printf "%t: Received vpinfo %d from input node..." timedcontext peervpinfo);
      VpCh(peervpinfo,ufd,saddr);; 

(* put vpinchl in the same vpinfo order as vpinfol *)

let sameorderof vpinfol vpinchl = 
    let alist = List.map (fun (VpCh(a,b,c)) -> (a,(b,c))) vpinchl in
    List.map (fun x-> let y,z = List.assoc x alist in VpCh(x,y,z)) vpinfol;;

let setup_receive_chans icl myvpchan dbgstring =
    let unordered_chans = List.map (fun _ -> setup_receive_chan myvpchan) icl
    in debug (fun () -> printf "%t: %s got all input channels" context dbgstring);
       (* now put the received input channels in the same vpinfo order as icl is *)
       (*** MIGHT GET AN ERROR HERE IN CASE OF MISMATCH: SHOULD BE SMARTER IN CASE
            OF PHASED RECONFIGURATION? ***)
       sameorderof icl unordered_chans;;

let spawn func argu = spawn func argu;;
back to top