Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file gs_transport_connection.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451(*****************************************************************************)(* *)(* Open Source License *)(* Copyright (c) 2023 Nomadic Labs, <contact@nomadic-labs.com> *)(* Copyright (c) 2023 Functori, <contact@functori.com> *)(* *)(* Permission is hereby granted, free of charge, to any person obtaining a *)(* copy of this software and associated documentation files (the "Software"),*)(* to deal in the Software without restriction, including without limitation *)(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *)(* and/or sell copies of the Software, and to permit persons to whom the *)(* Software is furnished to do so, subject to the following conditions: *)(* *)(* The above copyright notice and this permission notice shall be included *)(* in all copies or substantial portions of the Software. *)(* *)(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*)(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *)(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *)(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*)(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *)(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *)(* DEALINGS IN THE SOFTWARE. *)(* *)(*****************************************************************************)moduleWorker=Gs_interface.Worker_instanceopenGs_interface.Worker_instancemoduleEvents=structincludeInternal_event.Simpleletsection=["gossipsub";"transport";"event"]letprefix=letprefix=String.concat"_"sectioninfuns->prefix^"-"^sletno_connection_for_peer=declare_1~section~name:(prefix"no_connection_for_peer")~msg:"No running connection found for peer {peer}"~level:Notice~pp1:P2p_peer.Id.pp("peer",P2p_peer.Id.encoding)letmessage_notified_to_app=declare_1~section~name:(prefix"message_notified_to_app")~msg:"Successfully notified message id {message_id} to the application"~level:Info~pp1:Worker.GS.Message_id.pp("message_id",Types.Message_id.encoding)letapp_message_callback_failed=declare_2~section~name:(prefix"app_message_callback_failed")~msg:"Callback failed for message id {message_id}. Failure is {failure}"~level:Warning~pp1:Worker.GS.Message_id.pp~pp2:pp_print_trace("message_id",Types.Message_id.encoding)("failure",trace_encoding)letsend_p2p_message_failed=declare_2~section~name:(prefix"p2p_send_failed")~msg:"Sending P2P message to {peer} failed with error {failure}"~level:Warning~pp1:P2p_peer.Id.pp~pp2:pp_print_trace("peer",P2p_peer.Id.encoding)("failure",trace_encoding)letsend_p2p_message=declare_2~section~name:(prefix"p2p_send")~msg:"Sending to {peer} P2P message {message}"~level:Debug~pp1:P2p_peer.Id.pp~pp2:Transport_layer_interface.pp_p2p_message("peer",P2p_peer.Id.encoding)("message",Transport_layer_interface.p2p_message_encoding)end(** This module implements a cache of alternative peers (aka PX peers), that is,
peers we are not (yet) connected to, advertised by P2P neighbors within
Prune messages.
The cache associates a P2P point to each pair made of the advertised peer
and the peer that advertised it. In case of redundancy, the last advertised
point is kept in the cache.
The cache is in theory not bounded. However, some invariants on the way
{!insert} and {!drop} are used below ensures that we remove the added entries very
quickly.
*)modulePX_cache:sig(** The cache data structure for advertised alternative PXs. *)typettypeorigin=Worker.peer_origin(** Create a new cache data structure. The [size] parameter is an indication
on the size of internal table storing data. Its default value is
[2048]. *)valcreate:?size:int->unit->t(** [insert t ~origin ~px_peer point] associates the given [point] to
[(origin, px_peer)] pair of peers. If a point already exists for the pair,
it is overwritten. *)valinsert:t->origin:origin->px_peer:P2p_peer.Id.t->P2p_point.Id.t->unit(** [find_opt t ~origin ~px_peer] returns the content associated to the entry
[(origin, px)] in the cache, if any. *)valfind_opt:t->origin:origin->px_peer:P2p_peer.Id.t->P2p_point.Id.toption(** [drop t ~origin ~px_peer] drops the entry [(origin, px_peer)] from the cache. *)valdrop:t->origin:origin->px_peer:P2p_peer.Id.t->unitend=structtypeorigin=Worker.peer_origintypekey={origin:origin;px_peer:P2p_peer.Id.t}moduleTable=Hashtbl.Make(structtypet=keyletequal{origin;px_peer}k2=P2p_peer.Id.equalpx_peerk2.px_peer&&match(origin,k2.origin)with|PXpeer1,PXpeer2->P2p_peer.Id.equalpeer1peer2|Trusted,Trusted->true|PX_,_|Trusted,_->falselethash{origin;px_peer}=matchoriginwith|PXpeer->(P2p_peer.Id.hashpx_peer*31)+P2p_peer.Id.hashpeer|Trusted->P2p_peer.Id.hashpx_peer*17end)typet=P2p_point.Id.tTable.tletcreate?(size=2048)()=Table.createsizeletinserttable~origin~px_peerpoint=Table.replacetable{px_peer;origin}pointletdroptable~origin~px_peer=Table.removetable{origin;px_peer}letfind_opttable~origin~px_peer=Table.find_opttable{origin;px_peer}end(* [px_peer_of_peer p2p_layer peer] returns the public IP address and port at which
[peer] could be reached. For that, it first inspects information transmitted
via connection metadata by the remote [peer]. If the address or port are
missing from the connection's metadata, the function inspects the Internet
connection link's information. It returns [None] if it does manage to get
those information with both methods. *)letpx_peer_of_peerp2p_layerpeer=letopenOption_syntaxinletopenTransport_layer_interfaceinlet*conn=P2p.find_connection_by_peer_idp2p_layerpeerin(* In general, people either provide an address and a port, or just a port.
In any case, we use `P2p.connection_remote_metadata` to get the address and
the port of the provided values, and we fall back to the values given by
`P2p.connection_info` if the former are not available
(respectively/independently for the address and the port). The first case
is covered by {!P2p.connection_remote_metadata}. But if the IP address is
not explicitly given, we rely on the function {!P2p.connection_info
p2p_layer conn}. *)letTypes.P2P.Metadata.Connection.{advertised_net_addr;advertised_net_port;is_bootstrap_peer=_}=P2p.connection_remote_metadatap2p_layerconninlet{P2p_connection.Info.id_point=conn_addr,conn_port_opt;_}=P2p.connection_infop2p_layerconninletaddr=Option.valueadvertised_net_addr~default:conn_addrinlet*port=Option.eitheradvertised_net_portconn_port_optinreturn{point=(addr,port);peer}(* FIXME: https://gitlab.com/tezos/tezos/-/issues/6637
When adding an RPC to trust peers/points. Use this function when trusting a peer
via an RPC. *)(* This function inserts the point of a trusted peer into the [px_cache]
table. *)letcache_point_of_trusted_peerp2p_layerpx_cachepeer=px_peer_of_peerp2p_layerpeer|>Option.iter(funTransport_layer_interface.{point;peer}->PX_cache.insertpx_cache~origin:Trusted~px_peer:peerpoint)(** This handler forwards information about connections established by the P2P
layer to the Gossipsub worker.
Note that, a connection is considered [outbound] only if we initiated it and
we trust the point or the peer we are connecting to. Consequently, PX peers
are not considered outgoing connections by default, as they are not trusted
unless explicitly specified otherwise.
Indeed, Gossipsub tries to maintain a threshold of outbound connections per
topic. So, we don't automatically set connections we initiate to PX peers as
outbound to avoid possible love bombing attacks. The Rust version also
implements a way to mitigate this risk, but not the Go implementation.
*)letnew_connections_handlerpx_cachegs_workerp2p_layerpeerconn=letP2p_connection.Info.{id_point=addr,port_opt;_}=P2p.connection_infop2p_layerconninletTypes.P2P.Metadata.Connection.{is_bootstrap_peer=bootstrap;_}=P2p.connection_remote_metadatap2p_layerconninletpool_opt=P2p.poolp2p_layerinletfold_pool_optfarg=Option.foldpool_opt~none:true(* It doesn't matter in fake networks where pool is None *)~some:(funpool->fpoolarg)inlettrusted_peer=fold_pool_optP2p_pool.Peers.get_trustedpeerinlettrusted_point=Option.foldport_opt~none:false~some:(funport->fold_pool_optP2p_pool.Points.get_trusted(addr,port))inlettrusted=trusted_peer||trusted_pointin(* TODO: https://gitlab.com/tezos/tezos/-/issues/5584
Add the ability to have direct peers. *)letdirect=falseiniftrustedthencache_point_of_trusted_peerp2p_layerpx_cachepeer;Worker.(New_connection{peer;direct;trusted;bootstrap}|>p2p_inputgs_worker)(** This handler forwards information about P2P disconnections to the Gossipsub
worker. *)letdisconnections_handlergs_workerpeer=Worker.(Disconnection{peer}|>p2p_inputgs_worker)(* This function translates a Worker p2p_message to the type of messages sent
via the P2P layer. The two types don't coincide because of Prune. *)letwrap_p2p_messagep2p_layer=letmoduleW=WorkerinletopenTransport_layer_interfaceinfunction|W.Graft{topic}->Graft{topic}|W.Prune{topic;px;backoff}->letpx=Seq.filter_map(funpeer->px_peer_of_peerp2p_layerpeer)pxinPrune{topic;px;backoff}|W.IHave{topic;message_ids}->IHave{topic;message_ids}|W.IWant{message_ids}->IWant{message_ids}|W.Subscribe{topic}->Subscribe{topic}|W.Unsubscribe{topic}->Unsubscribe{topic}|W.Message_with_header{message;topic;message_id}->Message_with_header{message;topic;message_id}(* This function translates a message received via the P2P layer to a Worker
p2p_message. The two types don't coincide because of Prune. *)letunwrap_p2p_messagep2p_layer~from_peerpx_cache=letopenWorkerinletmoduleI=Transport_layer_interfaceinfunction|I.Graft{topic}->Graft{topic}|I.Prune{topic;px;backoff}->letpx=Seq.map(funI.{point;peer}->ifOption.is_none@@P2p.find_connection_by_peer_idp2p_layerpeerthenPX_cache.insertpx_cache~origin:(PXfrom_peer)~px_peer:peerpoint;peer)pxinPrune{topic;px;backoff}|I.IHave{topic;message_ids}->IHave{topic;message_ids}|I.IWant{message_ids}->IWant{message_ids}|I.Subscribe{topic}->Subscribe{topic}|I.Unsubscribe{topic}->Unsubscribe{topic}|I.Message_with_header{message;topic;message_id}->Message_with_header{message;topic;message_id}lettry_connect_point?expected_peer_idp2p_layerpoint=letopenLwt_syntaxinmatchP2p.poolp2p_layerwith|None->return_unit|Somepool->ifOption.is_some@@P2p_pool.Connection.find_by_pointpoolpointthenreturn_unit(* already connected. *)elselet*(_:_P2p.connectiontzresult)=P2p.connect?expected_peer_idp2p_layerpointinreturn_unitlettry_connectp2p_layerpx_cache~px_peer~origin=letopenLwt_syntaxin(* If there is some [point] associated to [px_peer] and advertised by [origin]
on the [px_cache], we will try to connect to it. *)matchPX_cache.find_optpx_cache~px_peer~originwith|Somepoint->(* FIXME: https://gitlab.com/tezos/tezos/-/issues/5799
We may have an issue as described by the following scenario:
- A legit pair [(point, peer)] is already known by P2P, but the remote
peer is disconnected for some reason;
- Some (malicious) peer advertises a fake px_peer [(point, peer')], where
[peer'] is not the real peer id associated to the [point];
- We try to connect to [point], setting at the same time that [peer']
is the expected peer id for this point;
- The connection fails, but the old (correct) association [(point,
peer)] is lost.
We may want to revert [peer'] association to [point]. But we sill have
an issue in case [(point, peer)] is actually the fake info and [(point,
peer')] the legit association but the node is disconnected when trying
to connect to it.
This implementation will be hardened once we add the notion of "signed
records" found, e.g., in Rust version, to check that the advertised
(peer, point) pair alongside a timestamp are not faked. *)let*()=try_connect_point~expected_peer_id:px_peerp2p_layerpointin(matchoriginwith|Trusted->()(* Don't drop trusted points. *)|PX_->PX_cache.droppx_cache~px_peer~origin);return_unit|_->return_unit(** This handler pops and processes the items put by the worker in the p2p
output stream. The out messages are sent to the corresponding peers and the
directives to the P2P layer to connect or disconnect peers are handled. *)letgs_worker_p2p_output_handlergs_workerp2p_layerpx_cache=letopenLwt_syntaxin(* only log sending of GS control messages *)letlog_sending_message=function|Message_with_header_->false|_->trueinletrecloopoutput_stream=let*p2p_output=Worker.Stream.popoutput_streaminlet*()=matchp2p_outputwith|Worker.Out_message{to_peer;p2p_message}->(letconn=P2p.find_connection_by_peer_idp2p_layerto_peerinmatchconnwith|None->(* This could happen when the peer is disconnected or the
connection is accepted but not running (authenticated) yet. *)(* TODO: https://gitlab.com/tezos/tezos/-/issues/5649
Are there weird cases in which there is no connection
associated to the peer, but the peer is still registered as
connected on the GS side? *)Events.(emitno_connection_for_peerto_peer)|Someconn->(let*(res:unittzresult)=letmsg=wrap_p2p_messagep2p_layerp2p_messageinlet*()=iflog_sending_messagep2p_messagethenEvents.(emitsend_p2p_message(to_peer,msg))elsereturn_unitinP2p.sendp2p_layerconnmsginmatchreswith|Ok()->return_unit|Errorerr->Events.(emitsend_p2p_message_failed(to_peer,err))))|Disconnect{peer}->P2p.find_connection_by_peer_idp2p_layerpeer|>Option.iter_s(P2p.disconnect~reason:"disconnected by Gossipsub"p2p_layer)|Connect{peer;origin}->try_connectp2p_layerpx_cache~px_peer:peer~origin|Connect_point{point}->try_connect_pointp2p_layerpoint|Forget{peer;origin}->PX_cache.droppx_cache~px_peer:peer~origin:(PXorigin);return_unit|Kick{peer}->P2p.poolp2p_layer|>Option.iter_s(funpool->P2p_pool.Peers.banpoolpeer)inloopoutput_streaminWorker.p2p_output_streamgs_worker|>loop(** This handler forwards p2p messages received via Octez p2p to the Gossipsub
worker. *)lettransport_layer_inputs_handlergs_workerp2p_layeradvertised_px_cache=letopenLwt_syntaxinletrecloop()=let*conn,msg=P2p.recv_anyp2p_layerinlet{P2p_connection.Info.peer_id=from_peer;_}=P2p.connection_infop2p_layerconninWorker.(In_message{from_peer;p2p_message=unwrap_p2p_messagep2p_layer~from_peeradvertised_px_cachemsg;}|>p2p_inputgs_worker);loop()inloop()(** This loop pops messages from application output stream and calls the given
[app_messages_callback] on them. *)letapp_messages_handlergs_worker~app_messages_callback=letopenLwt_syntaxinletrecloopapp_output_stream=let*Worker.{message;message_id;topic=_}=Worker.Stream.popapp_output_streaminlet*res=app_messages_callbackmessagemessage_idinlet*()=matchreswith|Ok()->Events.(emitmessage_notified_to_appmessage_id)|Errorerr->Events.(emitapp_message_callback_failed(message_id,err))inloopapp_output_streaminWorker.app_output_streamgs_worker|>loopletactivategs_workerp2p_layer~app_messages_callback=letpx_cache=PX_cache.create()in(* Register a handler to notify new P2P connections to GS. *)let()=new_connections_handlerpx_cachegs_workerp2p_layer|>P2p.on_new_connectionp2p_layerin(* Register a handler to notify P2P disconnections to GS. *)let()=disconnections_handlergs_worker|>P2p.on_disconnectionp2p_layerinLwt.join[gs_worker_p2p_output_handlergs_workerp2p_layerpx_cache;transport_layer_inputs_handlergs_workerp2p_layerpx_cache;app_messages_handlergs_worker~app_messages_callback;]