(*****************************************************************************) (* *) (* Open Source License *) (* Copyright (c) 2018 Dynamic Ledger Solutions, Inc. *) (* Copyright (c) 2019-2021 Nomadic Labs, *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) (* to deal in the Software without restriction, including without limitation *) (* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) (* and/or sell copies of the Software, and to permit persons to whom the *) (* Software is furnished to do so, subject to the following conditions: *) (* *) (* The above copyright notice and this permission notice shall be included *) (* in all copies or substantial portions of the Software. *) (* *) (* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) (* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) (* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) (* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) (* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) (* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) (* DEALINGS IN THE SOFTWARE. *) (* *) (*****************************************************************************) module Message = Distributed_db_message module P2p_reader_event = Distributed_db_event.P2p_reader_event type p2p = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.net type connection = (Message.t, Peer_metadata.t, Connection_metadata.t) P2p.connection type callback = { notify_branch : P2p_peer.Id.t -> Block_locator.t -> unit; notify_head : P2p_peer.Id.t -> Block_hash.t -> Block_header.t -> Mempool.t -> unit; disconnection : P2p_peer.Id.t -> unit; } module Block_hash_cache : Ringo.CACHE_MAP with type key = Block_hash.t = (val Ringo.(map_maker ~replacement:LRU ~overflow:Strong ~accounting:Precise)) (Block_hash) type chain_db = { chain_store : Store.Chain.t; operation_db : Distributed_db_requester.Raw_operation.t; block_header_db : Distributed_db_requester.Raw_block_header.t; operations_db : Distributed_db_requester.Raw_operations.t; callback : callback; active_peers : P2p_peer.Set.t ref; active_connections : connection P2p_peer.Table.t; } type t = { p2p : p2p; gid : P2p_peer.Id.t; (** remote peer id *) conn : connection; peer_active_chains : chain_db Chain_id.Table.t; disk : Store.t; canceler : Lwt_canceler.t; mutable worker : unit Lwt.t; protocol_db : Distributed_db_requester.Raw_protocol.t; active_chains : chain_db Chain_id.Table.t; (** All chains managed by this peer **) unregister : unit -> unit; } (* performs [f chain_db] if the chain is active for the remote peer and [chain_db] is the chain_db corresponding to this chain id, otherwise does nothing (simply update peer metadata). *) let may_handle state chain_id f = match Chain_id.Table.find state.peer_active_chains chain_id with | None -> let meta = P2p.get_peer_metadata state.p2p state.gid in Peer_metadata.incr meta Inactive_chain ; Lwt.return_unit | Some chain_db -> f chain_db (* performs [f chain_db] if [chain_id] is active and [chain_db] is the chain_db corresponding to this chain id. *) let may_handle_global state chain_id f = match Chain_id.Table.find state.active_chains chain_id with | None -> Lwt.return_unit | Some chain_db -> f chain_db let find_pending_operations {peer_active_chains; _} h i = Chain_id.Table.to_seq_values peer_active_chains |> Seq.filter (fun chain_db -> Distributed_db_requester.Raw_operations.pending chain_db.operations_db (h, i)) |> Seq.first let find_pending_operation {peer_active_chains; _} h = Chain_id.Table.to_seq_values peer_active_chains |> Seq.filter (fun chain_db -> Distributed_db_requester.Raw_operation.pending chain_db.operation_db h) |> Seq.first let read_operation state h = (* Remember that seqs are lazy. The table is only traversed until a match is found, the rest is not explored. *) let open Lwt_syntax in Seq_s.of_seq (Chain_id.Table.to_seq state.active_chains) |> Seq_s.filter_map_s (fun (chain_id, chain_db) -> let+ v = Distributed_db_requester.Raw_operation.read_opt chain_db.operation_db h in Option.map (fun bh -> (chain_id, bh)) v) |> Seq_s.first let read_block {disk; _} h = let open Lwt_syntax in let* chain_stores = Store.all_chain_stores disk in List.find_map_s (fun chain_store -> let* o = Store.Block.read_block_opt chain_store h in let* o = match o with | None -> Store.Block.read_prechecked_block_opt chain_store h | Some b -> Lwt.return_some b in Option.map_s (fun b -> Lwt.return (Store.Chain.chain_id chain_store, b)) o) chain_stores let read_block_header db h = let open Lwt_syntax in let* o = read_block db h in match o with | None -> Lwt.return_none | Some (chain_id, block) -> Lwt.return_some (chain_id, Store.Block.header block) let read_predecessor_header {disk; _} h offset = Option.catch_os (fun () -> let open Lwt_syntax in let offset = Int32.to_int offset in let* chain_stores = Store.all_chain_stores disk in List.find_map_s (fun chain_store -> let* o = Store.Block.read_block_opt chain_store h ~distance:offset in match o with | None -> Lwt.return_none | Some block -> Lwt.return_some (Store.Block.header block)) chain_stores) let find_pending_block_header {peer_active_chains; _} h = Chain_id.Table.to_seq_values peer_active_chains |> Seq.filter (fun chain_db -> Distributed_db_requester.Raw_block_header.pending chain_db.block_header_db h) |> Seq.first let deactivate gid chain_db = chain_db.callback.disconnection gid ; chain_db.active_peers := P2p_peer.Set.remove gid !(chain_db.active_peers) ; P2p_peer.Table.remove chain_db.active_connections gid (* Active the chain_id for the remote peer. Is a nop if it is already activated. *) let activate state chain_id chain_db = match Chain_id.Table.find state.peer_active_chains chain_id with | Some _ -> () | None -> chain_db.active_peers := P2p_peer.Set.add state.gid !(chain_db.active_peers) ; P2p_peer.Table.add chain_db.active_connections state.gid state.conn ; Chain_id.Table.add state.peer_active_chains chain_id chain_db let my_peer_id state = P2p.peer_id state.p2p let handle_msg state msg = let open Lwt_syntax in let open Message in let meta = P2p.get_peer_metadata state.p2p state.gid in let* () = P2p_reader_event.(emit read_message) (state.gid, P2p_message.Message msg) in match msg with | Get_current_branch chain_id -> Peer_metadata.incr meta @@ Received_request Branch ; may_handle_global state chain_id @@ fun chain_db -> activate state chain_id chain_db ; let seed = {Block_locator.receiver_id = state.gid; sender_id = my_peer_id state} in let* current_head = Store.Chain.current_head chain_db.chain_store in let* locator = Store.Chain.compute_locator chain_db.chain_store current_head seed in Peer_metadata.update_responses meta Branch @@ P2p.try_send state.p2p state.conn @@ Current_branch (chain_id, locator) ; Lwt.return_unit | Current_branch (chain_id, locator) -> may_handle state chain_id @@ fun chain_db -> let {Block_locator.head_hash; head_header; history} = locator in let* known_invalid = List.exists_p (Store.Block.is_known_invalid chain_db.chain_store) (head_hash :: history) in if known_invalid then ( let* () = P2p.disconnect state.p2p state.conn in P2p.greylist_peer state.p2p state.gid ; Lwt.return_unit) else if not (Clock_drift.is_not_too_far_in_the_future head_header.shell.timestamp) then ( Peer_metadata.incr meta Future_block ; P2p_reader_event.(emit received_future_block) (head_hash, state.gid)) else ( chain_db.callback.notify_branch state.gid locator ; (* TODO discriminate between received advertisements and responses? *) Peer_metadata.incr meta @@ Received_advertisement Branch ; Lwt.return_unit) | Deactivate chain_id -> may_handle state chain_id @@ fun chain_db -> deactivate state.gid chain_db ; Chain_id.Table.remove state.peer_active_chains chain_id ; Lwt.return_unit | Get_current_head chain_id -> may_handle state chain_id @@ fun chain_db -> Peer_metadata.incr meta @@ Received_request Head ; let {Connection_metadata.disable_mempool; _} = P2p.connection_remote_metadata state.p2p state.conn in let* current_head = Store.Chain.current_head chain_db.chain_store in let head = Store.Block.header current_head in let* mempool = if disable_mempool then Lwt.return Mempool.empty else Store.Chain.mempool chain_db.chain_store in (* TODO bound the sent mempool size *) Peer_metadata.update_responses meta Head @@ P2p.try_send state.p2p state.conn @@ Current_head (chain_id, head, mempool) ; Lwt.return_unit | Current_head (chain_id, header, mempool) -> may_handle state chain_id @@ fun chain_db -> let header_hash = Block_header.hash header in let* known_invalid = Store.Block.is_known_invalid chain_db.chain_store header_hash in let {Connection_metadata.disable_mempool; _} = P2p.connection_local_metadata state.p2p state.conn in let known_invalid = known_invalid || (disable_mempool && mempool <> Mempool.empty) (* A non-empty mempool was received while mempool is deactivated, so the message is ignored. This should probably warrant a reduction of the sender's score. *) in if known_invalid then ( let* () = P2p.disconnect state.p2p state.conn in P2p.greylist_peer state.p2p state.gid ; Lwt.return_unit) else if not (Clock_drift.is_not_too_far_in_the_future header.shell.timestamp) then ( Peer_metadata.incr meta Future_block ; P2p_reader_event.(emit received_future_block) (header_hash, state.gid)) else ( chain_db.callback.notify_head state.gid header_hash header mempool ; (* TODO discriminate between received advertisements and responses? *) Peer_metadata.incr meta @@ Received_advertisement Head ; Lwt.return_unit) | Get_block_headers hashes -> Peer_metadata.incr meta @@ Received_request Block_header ; List.iter_p (fun hash -> let* o = read_block_header state hash in match o with | None -> Peer_metadata.incr meta @@ Unadvertised Block ; Lwt.return_unit | Some (_chain_id, header) -> Peer_metadata.update_responses meta Block_header @@ P2p.try_send state.p2p state.conn @@ Block_header header ; Lwt.return_unit) hashes | Block_header block -> ( let hash = Block_header.hash block in match find_pending_block_header state hash with | None -> Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> let* () = Distributed_db_requester.Raw_block_header.notify chain_db.block_header_db state.gid hash block in Peer_metadata.incr meta @@ Received_response Block_header ; Lwt.return_unit) | Get_operations hashes -> Peer_metadata.incr meta @@ Received_request Operations ; List.iter_p (fun hash -> let* o = read_operation state hash in match o with | None -> Peer_metadata.incr meta @@ Unadvertised Operations ; Lwt.return_unit | Some (_chain_id, op) -> Peer_metadata.update_responses meta Operations @@ P2p.try_send state.p2p state.conn @@ Operation op ; Lwt.return_unit) hashes | Operation operation -> ( let hash = Operation.hash operation in match find_pending_operation state hash with | None -> Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> let* () = Distributed_db_requester.Raw_operation.notify chain_db.operation_db state.gid hash operation in Peer_metadata.incr meta @@ Received_response Operations ; Lwt.return_unit) | Get_protocols hashes -> Peer_metadata.incr meta @@ Received_request Protocols ; List.iter_p (fun hash -> let* o = Store.Protocol.read state.disk hash in match o with | None -> Peer_metadata.incr meta @@ Unadvertised Protocol ; Lwt.return_unit | Some p -> Peer_metadata.update_responses meta Protocols @@ P2p.try_send state.p2p state.conn @@ Protocol p ; Lwt.return_unit) hashes | Protocol protocol -> let hash = Protocol.hash protocol in let* () = Distributed_db_requester.Raw_protocol.notify state.protocol_db state.gid hash protocol in Peer_metadata.incr meta @@ Received_response Protocols ; Lwt.return_unit | Get_operations_for_blocks blocks -> Peer_metadata.incr meta @@ Received_request Operations_for_block ; List.iter_p (fun (hash, ofs) -> let* o = read_block state hash in match o with | None -> Lwt.return_unit | Some (_, block) -> let ops, path = Store.Block.operations_path block ofs in Peer_metadata.update_responses meta Operations_for_block @@ P2p.try_send state.p2p state.conn @@ Operations_for_block (hash, ofs, ops, path) ; Lwt.return_unit) blocks | Operations_for_block (block, ofs, ops, path) -> ( match find_pending_operations state block ofs with | None -> Peer_metadata.incr meta Unexpected_response ; Lwt.return_unit | Some chain_db -> let* () = Distributed_db_requester.Raw_operations.notify chain_db.operations_db state.gid (block, ofs) (ops, path) in Peer_metadata.incr meta @@ Received_response Operations_for_block ; Lwt.return_unit) | Get_checkpoint chain_id -> ( Peer_metadata.incr meta @@ Received_request Checkpoint ; may_handle_global state chain_id @@ fun chain_db -> let* checkpoint_hash, _ = Store.Chain.checkpoint chain_db.chain_store in let* o = Store.Block.read_block_opt chain_db.chain_store checkpoint_hash in match o with | None -> Lwt.return_unit | Some checkpoint -> let checkpoint_header = Store.Block.header checkpoint in Peer_metadata.update_responses meta Checkpoint @@ P2p.try_send state.p2p state.conn @@ Checkpoint (chain_id, checkpoint_header) ; Lwt.return_unit) | Checkpoint _ -> (* This message is currently unused: it will be used for future bootstrap heuristics. *) Peer_metadata.incr meta @@ Received_response Checkpoint ; Lwt.return_unit | Get_protocol_branch (chain_id, proto_level) -> ( Peer_metadata.incr meta @@ Received_request Protocol_branch ; may_handle_global state chain_id @@ fun chain_db -> activate state chain_id chain_db ; let seed = {Block_locator.receiver_id = state.gid; sender_id = my_peer_id state} in let* o = Store.Chain.compute_protocol_locator chain_db.chain_store ~proto_level seed in match o with | Some locator -> Peer_metadata.update_responses meta Protocol_branch @@ P2p.try_send state.p2p state.conn @@ Protocol_branch (chain_id, proto_level, locator) ; Lwt.return_unit | None -> Lwt.return_unit) | Protocol_branch (_chain, _proto_level, _locator) -> (* This message is currently unused: it will be used for future multipass. *) Peer_metadata.incr meta @@ Received_response Protocol_branch ; Lwt.return_unit | Get_predecessor_header (block_hash, offset) -> ( Peer_metadata.incr meta @@ Received_request Predecessor_header ; let* o = read_predecessor_header state block_hash offset in match o with | None -> (* The peer is not expected to request blocks that are beyond our locator. *) Peer_metadata.incr meta @@ Unadvertised Block ; Lwt.return_unit | Some header -> Peer_metadata.update_responses meta Predecessor_header @@ P2p.try_send state.p2p state.conn @@ Predecessor_header (block_hash, offset, header) ; Lwt.return_unit) | Predecessor_header (_block_hash, _offset, _header) -> (* This message is currently unused: it will be used to improve bootstrapping. *) Peer_metadata.incr meta @@ Received_response Predecessor_header ; Lwt.return_unit let rec worker_loop state = let open Lwt_syntax in let* o = protect ~canceler:state.canceler (fun () -> P2p.recv state.p2p state.conn) in match o with | Ok msg -> let* () = handle_msg state msg in worker_loop state | Error _ -> Chain_id.Table.iter (fun _ -> deactivate state.gid) state.peer_active_chains ; state.unregister () ; Lwt.return_unit let run ~register ~unregister p2p disk protocol_db active_chains gid conn = let canceler = Lwt_canceler.create () in let state = { active_chains; protocol_db; p2p; disk; conn; gid; canceler; peer_active_chains = Chain_id.Table.create 17; worker = Lwt.return_unit; unregister; } in Chain_id.Table.iter (fun chain_id _chain_db -> Error_monad.dont_wait (fun () -> let meta = P2p.get_peer_metadata p2p gid in Peer_metadata.incr meta (Sent_request Branch) ; P2p.send p2p conn (Get_current_branch chain_id)) (fun trace -> Format.eprintf "Uncaught error: %a\n%!" Error_monad.pp_print_trace trace) (fun exc -> Format.eprintf "Uncaught exception: %s\n%!" (Printexc.to_string exc))) active_chains ; state.worker <- Lwt_utils.worker (Format.asprintf "db_network_reader.%a" P2p_peer.Id.pp_short gid) ~on_event:Internal_event.Lwt_worker_event.on_event ~run:(fun () -> worker_loop state) ~cancel:(fun () -> Error_monad.cancel_with_exceptions canceler) ; register state let shutdown s = Error_monad.cancel_with_exceptions s.canceler