swh:1:snp:a72e953ecd624a7df6e6196bbdd05851996c5e40
Raw File
Tip revision: e9263b7025725fb7297040318df06d5f86b94721 authored by Viral B. Shah on 14 February 2013, 00:49:00 UTC
Merge branch 'master' of github.com:JuliaLang/julia
Tip revision: e9263b7
socket.jl
## IP ADDRESS HANDLING ##
abstract IpAddr

type IPv4 <: IpAddr
    host::Uint32
    IPv4(host::Uint32) = new(host)
    IPv4(a::Uint8,b::Uint8,c::Uint8,d::Uint8) = new(uint32(a)<<24|
                                                    uint32(b)<<16|
                                                    uint32(c)<<8|
                                                    d)
    function IPv4(a::Integer,b::Integer,c::Integer,d::Integer)
        if !(0<=a<=255 && 0<=b<=255 && 0<=c<=255 && 0<=d<=255)
            throw(DomainError())
        end
        IPv4(uint8(a),uint8(b),uint8(c),uint8(d))
    end
end

show(io::IO,ip::IPv4) = print(io,"IPv4(",dec((ip.host&(0xFF000000))>>24),".",
                                         dec((ip.host&(0xFF0000))>>16),".",
                                         dec((ip.host&(0xFF00))>>8),".",
                                         dec(ip.host&0xFF),")")

type IPv6 <: IpAddr
    host::Uint128
    IPv6(host::Uint128) = new(host)
    IPv6(a::Uint16,b::Uint16,c::Uint16,d::Uint16,
	 e::Uint16,f::Uint16,g::Uint16,h::Uint16) = new(uint128(a)<<(7*16)|
							uint128(b)<<(6*16)|
							uint128(c)<<(5*16)|
							uint128(d)<<(4*16)|
							uint128(e)<<(3*16)|
							uint128(f)<<(2*16)|
							uint128(g)<<(1*16)|
							h)
    function IPv6(a::Integer,b::Integer,c::Integer,d::Integer,
		  e::Integer,f::Integer,g::Integer,h::Integer)
	if !(0<=a<=0xFFFF && 0<=b<=0xFFFF && 0<=c<=0xFFFF && 0<=d<=0xFFFF &&
	     0<=e<=0xFFFF && 0<=f<=0xFFFF && 0<=g<=0xFFFF && 0<=h<=0xFFFF)
	    throw(DomainError())
	end
	IPv6(uint16(a),uint16(b),uint16(c),uint16(d),
	     uint16(e),uint16(f),uint16(g),uint16(h))
    end
end

show(io::IO,ip::IPv6) = print(io,"IPv6(",
                        hex((ip.host&(uint128(0xFFFF)<<(7*16)))>>(7*16)),":",
                        hex((ip.host&(uint128(0xFFFF)<<(6*16)))>>(7*16)),":",
                        hex((ip.host&(uint128(0xFFFF)<<(5*16)))>>(7*16)),":",
                        hex((ip.host&(uint128(0xFFFF)<<(4*16)))>>(7*16)),":",
                        hex((ip.host&(uint128(0xFFFF)<<(3*16)))>>(7*16)),":",
                        hex((ip.host&(uint128(0xFFFF)<<(2*16)))>>(7*16)),":",
                        hex((ip.host&0xFFFF), ")"))

type InetAddr
    host::IpAddr
    port::Uint16
    function InetAddr(host,port)
	if !(0 <= port <= typemax(Uint16))
	    throw(DomainError())
	end
	new(host,uint16(port))
    end
end


## SOCKETS ##

abstract Socket <: AsyncStream

type TcpSocket <: Socket
    handle::Ptr{Void}
    open::Bool
    line_buffered::Bool
    buffer::IOBuffer
    readcb::Callback
    readnotify::Vector{WaitTask}
    ccb::Callback
    connectnotify::Vector{WaitTask}
    closecb::Callback
    closenotify::Vector{WaitTask}
    TcpSocket(handle,open)=new(handle,open,true,PipeBuffer(),false,
                               WaitTask[],false,WaitTask[],false,WaitTask[])
    function TcpSocket()
        this = TcpSocket(C_NULL,false)
        this.handle = ccall(:jl_make_tcp,Ptr{Void},(Ptr{Void},Any),
                            eventloop(),this)
        if (this.handle == C_NULL)
            error("Failed to start reading: ",_uv_lasterror())
        end
        this
    end
end

type UdpSocket <: Socket
    handle::Ptr{Void}
    open::Bool
    line_buffered::Bool
    buffer::IOBuffer
    readcb::Callback
    readnotify::Vector{WaitTask}
    ccb::Callback
    connectnotify::Vector{WaitTask}
    closecb::Callback
    closenotify::Vector{WaitTask}
    UdpSocket(handle,open)=new(handle,open,true,PipeBuffer(),false,WaitTask[],
                               false,WaitTask[],false,WaitTask[])
    function UdpSocket()
        this = UdpSocket(C_NULL,false)
        this.handle = ccall(:jl_make_tcp,Ptr{Void},(Ptr{Void},Any),
                            eventloop(),this)
        this
    end
end

show(io::IO,sock::TcpSocket) = print(io,"TcpSocket(",sock.open?"connected,":
				     "disconnected,",nb_available(sock.buffer),
				     " bytes waiting)")

show(io::IO,sock::UdpSocket) = print(io,"UdpSocket(",sock.open?"connected,":
				     "disconnected,",nb_available(sock.buffer),
				     " bytes waiting)")
_jl_tcp_init(loop::Ptr{Void}) = ccall(:jl_tcp_init,Ptr{Void},(Ptr{Void},),loop)
_jl_udp_init(loop::Ptr{Void}) = ccall(:jl_udp_init,Ptr{Void},(Ptr{Void},),loop)

## VARIOUS METHODS TO BE MOVED TO BETTER LOCATION

_jl_connect_raw(sock::TcpSocket,sockaddr::Ptr{Void}) = 
    ccall(:jl_connect_raw,Int32,(Ptr{Void},Ptr{Void}),sock.handle,sockaddr)
_jl_sockaddr_from_addrinfo(addrinfo::Ptr{Void}) = 
    ccall(:jl_sockaddr_from_addrinfo,Ptr{Void},(Ptr{Void},),addrinfo)
_jl_sockaddr_set_port(ptr::Ptr{Void},port::Uint16) = 
    ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},Uint16),ptr,port)


## WAITING ##

function wait_accept(server::TcpSocket)
    client = TcpSocket()
    err = accept(server,client)
    if err == 0
        return client
    else
        err = _uv_lasterror()
        if err != 4 #EAGAIN
            error("accept: ", err, "\n")
        end
    end
    ct = current_task()
    tw = WaitTask(ct)
    while true
        push!(server.connectnotify,tw)
        ct.runnable = false
        args = yield()
        if isa(args,InterruptException)
            error(args)
        end
        status = args[2]::Int32
        if status == -1
            error("listen: ", _uv_lasterror(), "\n")
        end
        err = accept(server,client)
        if err == 0
            return client
        else
            err = _uv_lasterror()
            if err != 4 #EAGAIN
                error("accept: ", err, "\n")
            end
        end
    end
end

##

bind(sock::Socket, addr::InetAddr) = bind(sock,addr.host,addr.port)
bind(sock::Socket, host::IpAddr, port) = bind(sock, InetAddr(host,port))

const UV_SUCCESS = 0
const UV_EADDRINUSE = 5

function bind(sock::TcpSocket, host::IPv4, port::Uint16)
    err = ccall(:jl_tcp_bind, Int32, (Ptr{Void}, Uint16, Uint32),
	        sock.handle, hton(port), hton(host.host))
    if(err == -1 && _uv_lasterror() != UV_EADDRINUSE)
        throw(UVError("bind"))
    end
    err != -1
end

bind(sock::TcpSocket, host::IPv6, port::Uint16) = 
    error("IPv6 Support not fully implemented")

##

callback_dict = ObjectIdDict()

function _uv_hook_getaddrinfo(cb::Function,addrinfo::Ptr{Void}, status::Int32)
    delete!(callback_dict,cb)
    if(status==-1)
        throw(UVError("getaddrinfo callback"))
    end
    sockaddr = ccall(:jl_sockaddr_from_addrinfo,Ptr{Void},(Ptr{Void},),addrinfo)
    if(ccall(:jl_sockaddr_is_ip4,Int,(Ptr{Void},),sockaddr)==1)
        cb(IPv4(ntoh(ccall(:jl_sockaddr_host4,Uint32,(Ptr{Void},),sockaddr))))
    else
        cb(IPv6(ntoh(ccall(:jl_sockaddr_host6,Uint128,(Ptr{Void},),sockaddr))))
    end
end

jl_getaddrinfo(loop::Ptr{Void}, host::ByteString, service::Ptr{Void},
               cb::Function) =
        ccall(:jl_getaddrinfo, Int32, (Ptr{Void}, Ptr{Uint8}, Ptr{Uint8}, Any),
	      loop,      host,       service,    cb)

getaddrinfo(cb::Function,host::ASCIIString) = begin
    callback_dict[cb] = cb
    jl_getaddrinfo(eventloop(),host,C_NULL,cb)
end

function getaddrinfo(host::ASCIIString)
    ct=current_task()
    wt=WaitTask(ct)
    getaddrinfo(host) do IP
	tasknotify([wt],IP)
    end
    ct.runnable = false
    (ip,) = yield()
    return ip
end

##

function connect(cb::Function, sock::TcpSocket, host::IPv4, port::Uint16)
    sock.ccb = cb
    uv_error("connect",ccall(:jl_tcp4_connect,Int32,(Ptr{Void},Uint32,Uint16),
			     sock.handle,hton(host.host),hton(port)) == -1)
end

function connect(sock::TcpSocket, host::IPv4, port::Uint16) 
    uv_error("connect",ccall(:jl_tcp4_connect,Int32,(Ptr{Void},Uint32,Uint16),
			     sock.handle,hton(host.host),hton(port)) == -1)
    wait_connected(sock)
end

function connect(sock::TcpSocket, host::ASCIIString, port)
    ipaddr = getaddrinfo(host)
    connect(sock,ipaddr,port)
end

function default_connectcb(sock,status)
    if status == -1
        throw(UVError("connect callback"))
    end
end 

function connect(cb::Function, sock::TcpSocket, host::ASCIIString, port)
    getaddrinfo(host) do ipaddr
	connect(cb,sock,ipaddr,port)
    end
end


for (args,forward_args) in (((:(addr::InetAddr),), (:(addr.host),:(addr.port))),
			    ((:(host::IpAddr),:port),(:(InetAddr(host,port)),)),
			    ((:(addr::InetAddr),), (:(addr.host),:(addr.port))),
			    ((:(host::ASCIIString),:port), (:host,:port)))
    @eval begin
	connect(sock::Socket,$(args...)) = connect(sock,$(forward_args...))
	connect(cb::Function,sock::Socket,$(args...)) =
	connect(cb,sock,$(forward_args...))
	function connect($(args...))
	    sock = TcpSocket()
            sock.ccb = default_connectcb
	    connect(sock,$(forward_args...))
	    sock
	end
	function connect(cb::Function,$(args...))
	    sock = TcpSocket()
            sock.ccb = default_connectcb
	    connect(cb,sock,$(forward_args...))
	    sock
	end
    end
end

##

function listen(host::IPv4, port::Uint16)
    sock = TcpSocket()
    uv_error("listen",!bind(sock,host,port))
    listen(sock)
    sock
end
listen(port::Integer) = listen(IPv4(uint32(0)),uint16(port))
listen(addr::InetAddr) = listen(addr.host,addr.port)
listen(host::IpAddr, port::Uint16) = listen(InetAddr(host,port))

listen(cb::Callback,args...) = (sock=listen(args...);sock.ccb=cb;sock)
listen(cb::Callback,sock::Socket) = (sock.ccb=cb;listen(sock))

##

_jl_tcp_accept(server::Ptr{Void},client::Ptr{Void}) =
    ccall(:uv_accept,Int32,(Ptr{Void},Ptr{Void}),server,client)
function accept(server::TcpSocket,client::TcpSocket)
    err = _jl_tcp_accept(server.handle,client.handle)
    if err == 0
        client.open = true
    end
    err
end
function accept(server::TcpSocket)
    client = TcpSocket()
    uv_error("accept",_jl_tcp_accept(server.handle,client.handle) == -1)
    client.open = true
    client
end

## Utility functions

function open_any_tcp_port(cb::Callback,default_port)
    addr = InetAddr(IPv4(uint32(0)),default_port)
    while true
        sock = TcpSocket()

        if (bind(sock,addr) && listen(cb,sock))
            return (addr.port,sock)
        end
        err = _uv_lasterror()
        system = _uv_lastsystemerror()
        sock.open = true #need to make close() work
        close(sock)
        if (err != UV_SUCCESS && err != UV_EADDRINUSE)
            throw(UVError("open_any_tcp_port",err,system))
        end
	addr.port += 1
        if (addr.port==default_port)
            error("Not a single port is available.")
        end
    end
end
open_any_tcp_port(default_port) = open_any_tcp_port(false,default_port)
back to top