swh:1:snp:505c374fd75bb208ae4e9a54e64bb310bc49295e
Tip revision: df12b817879737855b82b460a09fa443c3d83ca4 authored by Romain Bardou on 04 August 2020, 15:58:58 UTC
Docker: use images with libgcc
Docker: use images with libgcc
Tip revision: df12b81
test_p2p_socket.ml
(*****************************************************************************)
(* *)
(* Open Source License *)
(* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. <contact@tezos.com> *)
(* Copyright (c) 2019-2020 Nomadic Labs, <contact@nomadic-labs.com> *)
(* *)
(* Permission is hereby granted, free of charge, to any person obtaining a *)
(* copy of this software and associated documentation files (the "Software"),*)
(* to deal in the Software without restriction, including without limitation *)
(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)
(* and/or sell copies of the Software, and to permit persons to whom the *)
(* Software is furnished to do so, subject to the following conditions: *)
(* *)
(* The above copyright notice and this permission notice shall be included *)
(* in all copies or substantial portions of the Software. *)
(* *)
(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)
(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)
(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)
(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)
(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)
(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)
(* DEALINGS IN THE SOFTWARE. *)
(* *)
(*****************************************************************************)
include Internal_event.Legacy_logging.Make (struct
let name = "test.p2p.connection"
end)
let addr = ref Ipaddr.V6.localhost
let canceler = Lwt_canceler.create () (* unused *)
let proof_of_work_target = Crypto_box.make_target 16.
let id1 = P2p_identity.generate proof_of_work_target
let id2 = P2p_identity.generate proof_of_work_target
let id0 =
(* Luckily, this will be an insufficient proof of work! *)
P2p_identity.generate (Crypto_box.make_target 0.)
let version =
{
Network_version.chain_name =
Distributed_db_version.Name.of_string "SANDBOXED_TEZOS";
distributed_db_version = Distributed_db_version.zero;
p2p_version = P2p_version.zero;
}
type metadata = unit
let conn_meta_config : metadata P2p_params.conn_meta_config =
{
conn_meta_encoding = Data_encoding.empty;
conn_meta_value = (fun () -> ());
private_node = (fun _ -> false);
}
let rec listen ?port addr =
let tentative_port =
match port with None -> 1024 + Random.int 8192 | Some port -> port
in
let uaddr = Ipaddr_unix.V6.to_inet_addr addr in
let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in
Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ;
Lwt.catch
(fun () ->
Lwt_unix.bind main_socket (ADDR_INET (uaddr, tentative_port))
>>= fun () ->
Lwt_unix.listen main_socket 1 ;
Lwt.return (main_socket, tentative_port))
(function
| Unix.Unix_error ((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _)
when port = None ->
listen addr
| exn ->
Lwt.fail exn)
let sync ch =
Process.Channel.push ch ()
>>=? fun () -> Process.Channel.pop ch >>=? fun () -> return_unit
let rec sync_nodes nodes =
iter_p (fun p -> Process.receive p) nodes
>>=? fun () ->
iter_p (fun p -> Process.send p ()) nodes >>=? fun () -> sync_nodes nodes
let sync_nodes nodes =
sync_nodes nodes
>>= function
| Ok () | Error (Exn End_of_file :: _) ->
return_unit
| Error _ as err ->
Lwt.return err
let run_nodes client server =
listen !addr
>>= fun (main_socket, port) ->
Process.detach ~prefix:"server: " (fun channel ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
server channel sched main_socket
>>=? fun () -> P2p_io_scheduler.shutdown sched >>= fun () -> return_unit)
>>=? fun server_node ->
Process.detach ~prefix:"client: " (fun channel ->
Lwt_utils_unix.safe_close main_socket
>>= (function
| Error trace ->
Format.eprintf "Uncaught error: %a\n%!" pp_print_error trace ;
Lwt.return_unit
| Ok () ->
Lwt.return_unit)
>>= fun () ->
let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in
client channel sched !addr port
>>=? fun () -> P2p_io_scheduler.shutdown sched >>= fun () -> return_unit)
>>=? fun client_node ->
let nodes = [server_node; client_node] in
Lwt.ignore_result (sync_nodes nodes) ;
Process.wait_all nodes
let raw_accept sched main_socket =
P2p_fd.accept main_socket
>>= fun (fd, sockaddr) ->
let fd = P2p_io_scheduler.register sched fd in
let point =
match sockaddr with
| Lwt_unix.ADDR_UNIX _ ->
assert false
| Lwt_unix.ADDR_INET (addr, port) ->
(Ipaddr_unix.V6.of_inet_addr_exn addr, port)
in
Lwt.return (fd, point)
let accept sched main_socket =
raw_accept sched main_socket
>>= fun (fd, point) ->
P2p_socket.authenticate
~canceler
~proof_of_work_target
~incoming:true
fd
point
id1
version
conn_meta_config
let raw_connect sched addr port =
P2p_fd.socket PF_INET6 SOCK_STREAM 0
>>= fun fd ->
let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in
P2p_fd.connect fd uaddr
>>= fun () ->
let fd = P2p_io_scheduler.register sched fd in
Lwt.return fd
let connect sched addr port id =
raw_connect sched addr port
>>= fun fd ->
P2p_socket.authenticate
~canceler
~proof_of_work_target
~incoming:false
fd
(addr, port)
id
version
conn_meta_config
>>=? fun (info, auth_fd) ->
_assert (not info.incoming) __LOC__ ""
>>=? fun () ->
_assert (P2p_peer.Id.compare info.peer_id id1.peer_id = 0) __LOC__ ""
>>=? fun () -> return auth_fd
let is_connection_closed = function
| Error (Tezos_p2p_services.P2p_errors.Connection_closed :: _) ->
true
| Ok _ ->
false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let is_decoding_error = function
| Error (Tezos_p2p_services.P2p_errors.Decoding_error _ :: _) ->
true
| Ok _ ->
false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
module Crypto_test = struct
(* maximal size of the buffer *)
let bufsize = (1 lsl 16) - 1
let header_length = 2
(* The size of extra data added by encryption. *)
let tag_length = Crypto_box.tag_length
(* The number of bytes added by encryption + header *)
let extrabytes = header_length + tag_length
let max_content_length = bufsize - extrabytes
type data = {
channel_key : Crypto_box.channel_key;
mutable local_nonce : Crypto_box.nonce;
mutable remote_nonce : Crypto_box.nonce;
}
let () = assert (tag_length >= header_length)
let write_chunk fd cryptobox_data msg =
let msg_length = Bytes.length msg in
fail_unless
(msg_length <= max_content_length)
Tezos_p2p_services.P2p_errors.Invalid_message_size
>>=? fun () ->
let encrypted_length = tag_length + msg_length in
let payload_length = header_length + encrypted_length in
let tag = Bytes.make tag_length '\x00' in
let cmsg = Bytes.copy msg in
let local_nonce = cryptobox_data.local_nonce in
cryptobox_data.local_nonce <- Crypto_box.increment_nonce local_nonce ;
Crypto_box.fast_box_noalloc cryptobox_data.channel_key local_nonce tag cmsg ;
let payload = Bytes.make payload_length '\x00' in
TzEndian.set_int16 payload 0 encrypted_length ;
Bytes.blit tag 0 payload header_length tag_length ;
Bytes.blit cmsg 0 payload extrabytes msg_length ;
return (Unix.write fd payload 0 payload_length)
>>=? fun i ->
_assert (payload_length = i) __LOC__ "" >>=? fun () -> return_unit
let read_chunk fd cryptobox_data =
let header_buf = Bytes.create header_length in
return (Unix.read fd header_buf 0 header_length)
>>=? fun i ->
_assert (header_length = i) __LOC__ ""
>>=? fun () ->
let encrypted_length = TzEndian.get_uint16 header_buf 0 in
assert (encrypted_length >= tag_length) ;
let msg_length = encrypted_length - tag_length in
let tag = Bytes.make tag_length '\x00' in
return (Unix.read fd tag 0 tag_length)
>>=? fun i ->
_assert (tag_length = i) __LOC__ ""
>>=? fun () ->
let msg = Bytes.make msg_length '\x00' in
( if msg_length > 0 then return (Unix.read fd msg 0 msg_length)
else return 0 )
>>=? fun i ->
_assert (msg_length = i) __LOC__ ""
>>=? fun () ->
let remote_nonce = cryptobox_data.remote_nonce in
cryptobox_data.remote_nonce <- Crypto_box.increment_nonce remote_nonce ;
match
Crypto_box.fast_box_open_noalloc
cryptobox_data.channel_key
remote_nonce
tag
msg
with
| false ->
fail Tezos_p2p_services.P2p_errors.Decipher_error
| true ->
return msg
let (sk, pk, pkh) = Crypto_box.random_keypair ()
let zero_nonce = Crypto_box.zero_nonce
let channel_key = Crypto_box.precompute sk pk
let (in_fd, out_fd) = Unix.pipe ()
let data = {channel_key; local_nonce = zero_nonce; remote_nonce = zero_nonce}
let wrap () =
Alcotest.test_case "ACK" `Quick (fun () ->
Lwt_main.run
(let msg = Bytes.of_string "test" in
write_chunk out_fd data msg
>>= fun _ ->
read_chunk in_fd data
>>= function
| Ok res when Bytes.equal msg res ->
Lwt.return_unit
| Ok res ->
Format.kasprintf
Stdlib.failwith
"Error : %s <> %s"
(Bytes.to_string res)
(Bytes.to_string msg)
| Error error ->
Format.kasprintf Stdlib.failwith "%a" pp_print_error error))
end
module Low_level = struct
let simple_msg = Rand.generate (1 lsl 4)
let client _ch sched addr port =
let msg = Bytes.create (Bytes.length simple_msg) in
raw_connect sched addr port
>>= fun fd ->
P2p_io_scheduler.read_full fd msg
>>=? fun () ->
_assert (Bytes.compare simple_msg msg = 0) __LOC__ ""
>>=? fun () -> P2p_io_scheduler.close fd >>=? fun () -> return_unit
let server _ch sched socket =
raw_accept sched socket
>>= fun (fd, _point) ->
P2p_io_scheduler.write fd simple_msg
>>=? fun () -> P2p_io_scheduler.close fd >>=? fun _ -> return_unit
let run _dir = run_nodes client server
end
module Nack = struct
let encoding = Data_encoding.bytes
let is_rejected = function
| Error (Tezos_p2p_services.P2p_errors.Rejected_by_nack _ :: _) ->
true
| Ok _ ->
false
| Error err ->
log_notice "Error: %a" pp_print_error err ;
false
let server _ch sched socket =
accept sched socket
>>=? fun (info, auth_fd) ->
_assert info.incoming __LOC__ ""
>>=? fun () ->
_assert (P2p_peer.Id.compare info.peer_id id2.peer_id = 0) __LOC__ ""
>>=? fun () ->
P2p_socket.nack auth_fd P2p_rejection.No_motive []
>>= fun () -> return_unit
let client _ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>= fun conn ->
_assert (is_rejected conn) __LOC__ "" >>=? fun () -> return_unit
let run _dir = run_nodes client server
end
module Nacked = struct
let encoding = Data_encoding.bytes
let server _ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>= fun conn ->
_assert (Nack.is_rejected conn) __LOC__ "" >>=? fun () -> return_unit
let client _ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.nack auth_fd P2p_rejection.No_motive []
>>= fun () -> return_unit
(* This test is skipped because its result on the CI is not deterministic *)
let run _dir = return_unit
end
module Simple_message = struct
let encoding = Data_encoding.bytes
let simple_msg = Rand.generate (1 lsl 4)
let simple_msg2 = Rand.generate (1 lsl 4)
let server ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg2 msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let client ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg2
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
module Chunked_message = struct
let encoding = Data_encoding.bytes
let simple_msg = Rand.generate (1 lsl 8)
let simple_msg2 = Rand.generate (1 lsl 8)
let server ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler ~binary_chunks_size:21 auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg2 msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let client ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler ~binary_chunks_size:21 auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg2
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
module Oversized_message = struct
let encoding = Data_encoding.bytes
let simple_msg = Rand.generate (1 lsl 17)
let simple_msg2 = Rand.generate (1 lsl 17)
let server ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg2 msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let client ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.write_sync conn simple_msg2
>>=? fun () ->
P2p_socket.read conn
>>=? fun (_msg_size, msg) ->
_assert (Bytes.compare simple_msg msg = 0) __LOC__ ""
>>=? fun () ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
module Close_on_read = struct
let encoding = Data_encoding.bytes
let simple_msg = Rand.generate (1 lsl 4)
let server ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
sync ch >>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let client ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
sync ch
>>=? fun () ->
P2p_socket.read conn
>>= fun err ->
_assert (is_connection_closed err) __LOC__ ""
>>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
module Close_on_write = struct
let encoding = Data_encoding.bytes
let simple_msg = Rand.generate (1 lsl 4)
let server ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.close conn >>= fun _stat -> sync ch >>=? fun () -> return_unit
let client ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
sync ch
>>=? fun () ->
Lwt_unix.sleep 0.1
>>= fun () ->
P2p_socket.write_sync conn simple_msg
>>= fun err ->
_assert (is_connection_closed err) __LOC__ ""
>>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
module Garbled_data = struct
let encoding =
let open Data_encoding in
dynamic_size @@ option @@ string
(* generate a fixed garbled_msg to avoid 'Data_encoding.Binary.Await
_', which blocks 'make test' *)
let garbled_msg =
let buf = Bytes.create (1 lsl 4) in
TzEndian.set_int32 buf 0 (Int32.of_int 4) ;
TzEndian.set_int32 buf 4 (Int32.of_int (-1)) ;
TzEndian.set_int32 buf 8 (Int32.of_int (-1)) ;
TzEndian.set_int32 buf 12 (Int32.of_int (-1)) ;
buf
let server _ch sched socket =
accept sched socket
>>=? fun (_info, auth_fd) ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.raw_write_sync conn garbled_msg
>>=? fun () ->
P2p_socket.read conn
>>= fun err ->
_assert (is_connection_closed err) __LOC__ ""
>>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let client _ch sched addr port =
connect sched addr port id2
>>=? fun auth_fd ->
P2p_socket.accept ~canceler auth_fd encoding
>>=? fun conn ->
P2p_socket.read conn
>>= fun err ->
_assert (is_decoding_error err) __LOC__ ""
>>=? fun () -> P2p_socket.close conn >>= fun _stat -> return_unit
let run _dir = run_nodes client server
end
let log_config = ref None
let spec =
Arg.
[ ( "--addr",
String (fun p -> addr := Ipaddr.V6.of_string_exn p),
" Listening addr" );
( "-v",
Unit
(fun () ->
log_config :=
Some
(Lwt_log_sink_unix.create_cfg
~rules:"test.p2p.connection -> info; p2p.connection -> info"
())),
" Log up to info msgs" );
( "-vv",
Unit
(fun () ->
log_config :=
Some
(Lwt_log_sink_unix.create_cfg
~rules:
"test.p2p.connection -> debug; p2p.connection -> debug"
())),
" Log up to debug msgs" ) ]
let init_logs = lazy (Internal_event_unix.init ?lwt_log_sink:!log_config ())
let wrap n f =
Alcotest.test_case n `Quick (fun () ->
Lwt_main.run
( Lazy.force init_logs
>>= fun () ->
f ()
>>= function
| Ok () ->
Lwt.return_unit
| Error error ->
Format.kasprintf Stdlib.failwith "%a" pp_print_error error ))
let main () =
let anon_fun _num_peers = raise (Arg.Bad "No anonymous argument.") in
let usage_msg = "Usage: %s.\nArguments are:" in
Arg.parse spec anon_fun usage_msg ;
Alcotest.run
~argv:[|""|]
"tezos-p2p"
[ ( "p2p-connection.",
[ wrap "low-level" Low_level.run;
wrap "nack" Nack.run;
wrap "nacked" Nacked.run;
wrap "simple-message" Simple_message.run;
wrap "chunked-message" Chunked_message.run;
wrap "oversized-message" Oversized_message.run;
wrap "close-on-read" Close_on_read.run;
wrap "close-on-write" Close_on_write.run;
wrap "garbled-data" Garbled_data.run;
Crypto_test.wrap () ] ) ]
let () =
Sys.catch_break true ;
try main () with _ -> ()