swh:1:snp:d7f1b9eb7ccb596c2622c4780febaa02549830f9
Tip revision: ad58ae426e7e9200530d18bf439d02657503426c authored by fclement on 23 November 2010, 11:33:06 UTC
Ignore all generated files.
Ignore all generated files.
Tip revision: ad58ae4
commlib.ml
(***********************************************************************)
(* *)
(* OCamlP3l *)
(* *)
(* (C) 2004-2007 *)
(* Roberto Di Cosmo (dicosmo@dicosmo.org) *)
(* Zheng Li (zli@lip6.fr) *)
(* Pierre Weis (Pierre.Weis@inria.fr) *)
(* Francois Clement (Francois.Clement@inria.fr) *)
(* *)
(* Based on original Ocaml P3L System *)
(* (C) 1997 by *)
(* Roberto Di Cosmo (dicosmo@ens.fr) *)
(* Marco Danelutto (marcod@di.unipi.it) *)
(* Xavier Leroy (Xavier.Leroy@inria.fr) *)
(* Susanna Pelagatti (susanna@di.unipi.it) *)
(* *)
(* This program is free software; you can redistribute it and/or *)
(* modify it under the terms of the GNU Library General Public License *)
(* as published by the Free Software Foundation; either version 2 *)
(* of the License, or (at your option) any later version. *)
(* *)
(* This program is distributed in the hope that it will be useful, *)
(* but WITHOUT ANY WARRANTY; without even the implied warranty of *)
(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *)
(* GNU Library General Public License for more details. *)
(* *)
(***********************************************************************)
(* $Id: commlib.ml,v 1.22 2007-01-23 15:50:04 weis Exp $ *)
(*
OcamlP3L Communication Library.
Here we abstract away details of the
low level architecture for communication among nodes
This means that we provide an abstract send and receive set
of operations with or without closures...
Most notably, we also try to implement a notion of
"communicator" like in MPI to be able to selectively
input data on certain given channels only. On unix
sockets, this is just a set of addresses/ports to be given to
a select operation...
*)
open Printf
open Unix
open Basedefs
open Server (* this module actually depends on the model, which is chosen at compilation time via
OcamlP4 preprocessing *)
(* reexport debug, defining the mask optional value ! *)
let debug=debug;;
let context=context;;
let timedcontext=timedcontext;;
(*** ABSTRACTIONS FOR PROCESSOR IDS
PHYLOSOPHY: this layer should be independent on
the actual communication layer used: sockets, PVM, MPI etc.
+ each communication channel should contain information about the
virtual processor it is connected to, besides the data structures
necessary to perform the actual communication
+ this vp info is used to properly select the communicators (=sets of vps)
used during each operation, and eventually will be used to support
broadcast, multicast, scatter and gather operations like in MPI
***)
type vpinfo = int;;
type vp_chan = VpCh of vpinfo * Unix.file_descr * Unix.sockaddr;;
type vp_extern = VpX of vpinfo * Unix.sockaddr;;
(* close on exec for vp_chan *)
let ch_set_close_on_exec (VpCh (_,fd,_)) = Unix.set_close_on_exec fd;;
let dummy_vpinfo = -1;;
let dummy_sockaddr = ADDR_UNIX("fake_sockaddr");;
let dummy_vpch = VpCh (dummy_vpinfo,Unix.stdin,dummy_sockaddr);;
(* this node's information *)
let setmyvpinfo, myvpinfo =
let (get,set) = Server.mklocalref (None: vpinfo option) in
((fun n -> set (Some n)),
(fun () ->
match (get ())
with Some n -> n
| _ -> (printf "%t: Commlib.myvpinfo: vpinfo not initialised for this node!" timedcontext; failwith "Internal error")
))
;;
let find_ch_of_vpinfo i =
let rec aux =
function
[] -> failwith ("Commlib.find_ch_of_vpinfo: Channel for node "^(string_of_int i)^" not found! Aborting")
| (VpCh(n,_,_) as ch)::rest -> if i=n then ch else aux rest
in fun chl -> aux chl;;
(*** Channel/vp management: creation of input port, connection to output port,
closing channels ***)
(* give me back my node identifier, which contains the initialized receive socket *)
(* a routine to get a free address on the local machine *)
let rec get_free_addr () =
let thisaddr = (match !Basedefs.ip with
Some ip -> debug (fun () -> printf "%t: forcing ip address to %s\n" context ip);
Unix.inet_addr_of_string ip
| None -> (let myip = (gethostbyname (gethostname())).h_addr_list.(0)
in debug (fun () -> printf "%t: forcing ip address to %s\n" context (Unix.string_of_inet_addr myip));myip))
(* DO NOT USE Unix.inet_addr_any HERE: this address will be SENT
to other processes!
TODO: handle all error cases, and we should try localhost also!!!
*)
in
let sockin = Unix.socket PF_INET SOCK_STREAM 0 in
(*
Unix.tcpsetsockopt sockin TCP_NODELAY; (* this needs an extension to Ocaml's unix library which is not there yet *)
*)
let inch = ADDR_INET(thisaddr,0) in (* passing 0 as port gives us back the first unpriviledged port free, thanx Xavier *)
try
Unix.bind sockin inch; (sockin,Unix.getsockname sockin)
with _ -> Unix.close sockin; failwith "get_free_addr: ports exhausted";;
(* Make the pattern-matching analyser happy *)
let getport =
function
(ADDR_INET(_,p)) -> p
| _ -> failwith "Commlib: invalid argument to getport";;
let get_addr_port =
function
(ADDR_INET(a,p)) -> (a,p)
| _ -> failwith "Commlib: invalid argument to getport";;
let mkvp n fanin = let (sockin,inch) = get_free_addr() in
(* WARNING: the size of the queue is sensible: we can loose packets if we are slow in accepting, *)
(* it appears that the sender is not able to receive the necessary ECONNREFUSED if the queue is full *)
let queuelength = (max 3 fanin) in
debug (fun () -> printf "%t: listening on socket with queue length %d" timedcontext queuelength);
Unix.listen sockin queuelength;
(* debug here! *)
let p = (getport inch) in
debug (fun () -> printf "%t: input on port %d" context p);
VpCh(n,sockin,inch);;
(* select the data I can send on a channel to other processors *)
let vp_extern (VpCh(n,sockin,inch)) = VpX(n,inch);;
let vp_extern_print =
function
VpX(n,(ADDR_INET(ia, p))) -> (printf "vp[%d] = %s/%d " n (string_of_inet_addr ia) p)
| _ -> failwith "Commlib: invalida rgument to vp_extern_print";;
let vp_info (VpCh(n,sockin,inch)) = n;;
(* debug out channel *)
let name_of_descr ?(peer=true) fd =
try
let n = (if peer then Unix.getpeername else Unix.getsockname) fd in
match n with
ADDR_INET(ip,port) -> (string_of_inet_addr ip)^":"^(string_of_int port)
| ADDR_UNIX(s) -> s
with e -> (debug (fun () -> printf "%t: exception %s in name_of_descr" timedcontext (Printexc.to_string e)); "fake")
;;
let addr_of_vpch ?(peer=true) (VpCh(i,fd,_)) = name_of_descr ~peer:peer fd;;
(* Close a wrapped channel *)
let vp_close (VpCh(i,fd,_)) =
debug(fun () -> printf "%t: closing channel from [%d] to [%d] %s -> %s.\n" timedcontext (myvpinfo()) i (name_of_descr ~peer:false fd) (name_of_descr fd));
try
Unix.close fd (* debug (fun () -> printf "%t: NOT CLOSING FOR DEBUGGING, RESTORE!" timedcontext) *)
with _ -> failwith "Error on closing channel";;
(*** SEND AND RECEIVE FUNCTIONS ***)
let really_write fd buf pos len = (* do not check buf boundaries *)
let n = ref len and p = ref pos in
while !n>0 do
let r = Unix.write fd buf !p !n in
debug (fun () -> printf "%t: asked output for %d data, really wrote %d bytes" context !n r);
p:= !p+r; n:= !n-r
done
;;
let unsafe_really_read fd buf pos len = (* do not check buf boundaries *)
let n = ref len and p = ref pos in
while !n>0 do
(* the real work *)
let r = Unix.read fd buf !p !n in
debug ~mask:128 (fun () -> printf "%t: asked for %d data, really read %d bytes" context !n r);
if r = 0
then (printf "%t: eof on channel %s -> %s" timedcontext (name_of_descr fd) (name_of_descr ~peer:false fd); raise End_of_file); (* we got EOF *)
p:= !p+r; n:= !n-r
done
;;
let flag_to_string =
function
Marshal.Closures -> " Closures"
| Marshal.No_sharing -> "No_sharing";;
let flags_to_string fl = List.fold_left (fun s -> fun f -> s^(flag_to_string f)) "" fl;;
let gen_cl_send (VpCh(n,fd,_)) data flags =
debug(fun () -> printf "%t: sending data with flags %s to [%d] on channel %s -> %s.\n" timedcontext (flags_to_string flags) n (name_of_descr ~peer:false fd) (name_of_descr fd));
let data = Marshal.to_string data flags in really_write fd data 0 (String.length data);
debug(fun () -> printf "%t: sent data to [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr ~peer:false fd) (name_of_descr fd))
;;
(* with closures *)
let cl_send ch data = gen_cl_send ch data [Marshal.Closures];;
(* without closures *)
let nocl_send ch data = gen_cl_send ch data [];;
let bsize=Marshal.header_size+128;;
let buf = ref (String.create bsize);;
let marshal_read ic =
unsafe_really_read ic !buf 0 Marshal.header_size;
let data_size = Marshal.data_size !buf 0 in
let total_size = (Marshal.header_size+data_size) in
let buf = if String.length !buf < total_size
then let bigger=ref (String.create total_size)
in String.blit !buf 0 !bigger 0 Marshal.header_size; bigger
else buf in
unsafe_really_read ic !buf Marshal.header_size data_size;
(Marshal.from_string !buf 0);;
(* just one receive function here *)
let receive (VpCh(n,fd,_)) =
debug(fun () -> printf "%t: reading data from [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr fd) (name_of_descr ~peer:false fd));
let data = marshal_read fd in
debug(fun () -> printf "%t: read data from [%d] on channel %s -> %s.\n" timedcontext n (name_of_descr fd) (name_of_descr ~peer:false fd));
data;;
(* communicators... *)
(* receiving data from a group of channels *)
(* Notice: in_channel, out_channel are _buffered_ in Ocaml, so we cannot just
do a receive ic if we are alternating reads from a set of channels, as this
requires to use select to find a ready channel, while we have no way to know
if some additional data is already available in the input buffer of one of the channels (thanx, Jerome!)
Hence, we need to implement our own buffer.
*)
let receive_any icl =
let fdl = List.map (fun (VpCh(_,fd,_)) -> fd) icl in
debug(fun () -> printf "%t: selecting..." context; print_newline());
let (ready, _, exc) = Unix.select fdl [] fdl (-1.0) in
debug(fun () -> printf "%t: selected..." context; print_newline());
if (List.length exc) <> 0 then
begin
printf "%t: exceptional conditions found on an input channel ..." timedcontext;
if (List.length ready) = 0 then failwith "no ready channels, exiting"
end;
let rec fd_assoc fd =
function
[] -> failwith "Commlib.receive_any: no ready descriptor"
| (VpCh(_,fd',_) as vpch ) :: rest -> if fd = fd' then vpch else fd_assoc fd rest in
let fd = List.hd ready in (marshal_read fd),(fd_assoc fd icl)
;;
(* reexport establish_smart_server, converting it to our vp representation *)
let establish_smart_server f =
let f' fd sockaddr = f (VpCh(dummy_vpinfo,fd,sockaddr))
in establish_smart_server f';;
(** channel/network utility routines *)
(* connections at the fd level *)
let fd_open_connection sockaddr =
let domain =
match sockaddr with ADDR_UNIX _ -> PF_UNIX | ADDR_INET(_,_) -> PF_INET in
let sock =
socket domain SOCK_STREAM 0 in
try
connect sock sockaddr;
sock
with exn ->
close sock; raise exn
;;
(* get an i/o connection to addr *)
let get_connection (addr,port) =
let rec try_connect n =
if n > 0 then begin
try
fd_open_connection (ADDR_INET(addr,port))
with Unix_error(ECONNREFUSED, _, _) ->
sleep 1; try_connect (n-1)
end else begin
debug (fun () -> printf "%t: could not connect to %s on port %d" context (string_of_inet_addr addr) p3lport);
raise (Unix_error(ECONNREFUSED, "get_connection", ""))
end
in try_connect 10
(* accept a connection on a socket *)
let doaccept s =
debug (fun () -> printf "%t: accepting" context);
let s' = Unix.accept s in
match s' with (_,ADDR_INET(ia,p)) ->
(debug (fun () -> printf "%t: accepted on port %d from %s" context p (string_of_inet_addr ia));
s')
| _ -> failwith "Commlib: invalid argument to doaccept";;
(* accept n incoming connections *)
let rec do_accepts i sockin dbgstring =
if i <= 0 then [] else
begin
let (c, unx_addr) =
debug (fun () ->printf "PID %d: do_accept %s ACCEPTING" (Unix.getpid()) dbgstring);
Unix.accept sockin
in
match unx_addr with
Unix.ADDR_INET(_,pn) -> (debug (fun () -> printf "PID %d: do_accept ACCEPTED port num %d " (Unix.getpid()) pn);
(c :: do_accepts (i-1) sockin dbgstring))
| _ -> failwith "Commlib: invalid argument to do_accepts"
end;;
(* get the internet address, and eventually port number, of a virtual processor from its string description
(can be IP number or symbolic name) *)
let proc_inet_addr proc =
let (proc,cap) =
try
let cappos=(String.index proc '%') in
(String.sub proc 0 cappos,
int_of_string(String.sub proc (cappos+1) ((String.length proc)-cappos-1)))
with _ ->(proc,-1)
(* Note that xxxx:xx#x$1 and xxxx:xx#x have the same meaning in strict mapping which is capability=1,
but different meanings in fuzzy mapping, which are capability=1 and "don't care". So we have to
set -1 as "not specified" and 1 as "capability=1" *)
in let (proc,col) =
try
let colpos=(String.index proc '#') in
(String.sub proc 0 colpos,
int_of_string(String.sub proc (colpos+1) ((String.length proc)-colpos-1)))
with _ -> (proc,0)
in let (proc,port) =
try
let colpos= (String.index proc ':') in
(String.sub proc 0 colpos,
int_of_string(String.sub proc (colpos+1) ((String.length proc)-colpos-1)))
with _ -> (proc,p3lport)
in let proc =
try
inet_addr_of_string proc
with Failure("inet_addr_of_string") ->
try
let adr = (gethostbyname proc).h_addr_list.(0) in
adr
with Not_found ->
failwith ("Commlib : proc_inet_addr : Unknown host "^proc)
in ((proc,port),col,cap);;
(*********** HIGHER LEVEL FUNCTIONS ****************)
(* get a vpchan structure, with initialization, out of vpinfo and ip address *)
let vpchan_of_addr (n,((ia,p) as addr)) = (n,VpCh(n,get_connection addr,ADDR_INET(ia,p)));;
(* a list of (output) channels out of a list of vpinfos *)
let outchs_of_vps vpchl myvpdata=
List.map
(fun (VpX(n,outaddr)) ->
let (ia,p) = get_addr_port outaddr in
debug (fun () -> printf "%t: connecting a channel to %s port %d\n"
context (string_of_inet_addr ia) p);
let sout = get_connection (ia,p) in
(* (* this needs an extension to Ocaml's unix library which is not there yet *)
Unix.tcpsetsockopt sout TCP_NODELAY;
*)
debug (fun () -> printf "%t: connected a channel to %s port %d\n"
context (string_of_inet_addr ia) p);
(*** here we need to send our vpinfo to the other node!!!! *)
debug (fun () -> printf "%t: Sending vpinfo %d to output node..." timedcontext (vp_info myvpdata));
let vpch = VpCh(n,sout,ADDR_INET(ia,p))
in cl_send vpch (vp_info myvpdata); vpch)
vpchl;;
let setup_receive_chan myvpchan =
let VpCh(_,fd,_) = myvpchan in
let (ufd,saddr) = doaccept fd in
(*** here we need to receive the peer's vpinfo!!! ***)
debug (fun () -> printf "%t: Receiving vpinfo from input node..." timedcontext);
let peervpinfo = marshal_read ufd
in
debug (fun () -> printf "%t: Received vpinfo %d from input node..." timedcontext peervpinfo);
VpCh(peervpinfo,ufd,saddr);;
(* put vpinchl in the same vpinfo order as vpinfol *)
let sameorderof vpinfol vpinchl =
let alist = List.map (fun (VpCh(a,b,c)) -> (a,(b,c))) vpinchl in
List.map (fun x-> let y,z = List.assoc x alist in VpCh(x,y,z)) vpinfol;;
let setup_receive_chans icl myvpchan dbgstring =
let unordered_chans = List.map (fun _ -> setup_receive_chan myvpchan) icl
in debug (fun () -> printf "%t: %s got all input channels" context dbgstring);
(* now put the received input channels in the same vpinfo order as icl is *)
(*** MIGHT GET AN ERROR HERE IN CASE OF MISMATCH: SHOULD BE SMARTER IN CASE
OF PHASED RECONFIGURATION? ***)
sameorderof icl unordered_chans;;
let spawn func argu = spawn func argu;;