Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file vat.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238openLwt.InfixopenCapnp_rpc_lwtmoduleLog=Capnp_rpc.Debug.LogmoduleID_map=Auth.Digest.MapmoduleMake(Network:S.NETWORK)(Underlying:Mirage_flow.S)=structmoduleCapTP=CapTP_capnp.Make(Network)lethash=`SHA256(* Only support a single hash for now *)typeconnection_attempt=(CapTP.t,Capnp_rpc.Exception.t)resultLwt.ttypet={network:Network.t;switch:Lwt_switch.toption;secret_key:Auth.Secret_key.tLazy.t;address:Network.Address.toption;restore:Restorer.t;tags:Logs.Tag.set;connection_removed:unitLwt_condition.t;(* Fires when a connection is removed *)mutableconnecting:connection_attemptID_map.t;(* Out-going connections being attempted. *)mutableconnections:CapTP.tID_map.t;(* Accepted connections *)mutableanon_connections:CapTP.tlist;(* Connections not using TLS. *)}letcreate?switch?(tags=Logs.Tag.empty)?(restore=Restorer.none)?address~secret_keynetwork=lett={network;switch;secret_key;address;restore;tags;connection_removed=Lwt_condition.create();connecting=ID_map.empty;connections=ID_map.empty;anon_connections=[];}inLwt_switch.add_hookswitch(fun()->letex=Capnp_rpc.Exception.v~ty:`Disconnected"Vat shut down"inID_map.bindingst.connections|>Lwt_list.iter_p(fun(_,c)->CapTP.disconnectcex)>>=fun()->t.connections<-ID_map.empty;Lwt_list.iter_p(func->CapTP.disconnectcex)t.anon_connections>|=fun()->t.anon_connections<-[];ID_map.iter(fun_th->Lwt.cancelth)t.connecting;t.connecting<-ID_map.empty;);tletadd_tls_connectiont~switchendpoint=letconn=CapTP.connect~tags:t.tags~restore:t.restoreendpointinletpeer_id=Endpoint.peer_idendpointint.connections<-ID_map.addpeer_idconnt.connections;Lwt_switch.add_hook(Someswitch)(fun()->beginmatchID_map.findpeer_idt.connectionswith|Somexwhenx==conn->t.connections<-ID_map.removepeer_idt.connections|Some_(* Already replaced by a new one? *)|None->()end;CapTP.disconnectconn(Capnp_rpc.Exception.v~ty:`Disconnected"Switch turned off")>|=fun()->Lwt_condition.broadcastt.connection_removed());connletadd_connectiont~switch~(mode:[`Accept|`Connect])endpoint=lettags=t.tagsinletpeer_id=Endpoint.peer_idendpointinifpeer_id=Auth.Digest.insecurethen(letconn=CapTP.connect~tags~restore:t.restoreendpointint.anon_connections<-conn::t.anon_connections;Lwt_switch.add_hook(Someswitch)(fun()->t.anon_connections<-List.filter((!=)conn)t.anon_connections;CapTP.disconnectconn(Capnp_rpc.Exception.v~ty:`Disconnected"Switch turned off")>|=fun()->Lwt_condition.broadcastt.connection_removed());Lwt.returnconn)elsematchID_map.findpeer_idt.connectionswith|None->Lwt.return@@add_tls_connectiont~switchendpoint|Someexisting->Log.info(funf->f~tags:t.tags"Trying to add a connection, but we already have one for this vat");(* This can happen if two vats call each other at exactly the same time.
Ideally, we would seamlessly merge the connections, but for now just
reject one of them and wait for the user to retry.
We must ensure that both ends discard the same connection though!
The connection to keep is the one initiated by the peer with the
highest vat ID. i.e.
- we are connecting and have the greater ID =>
[conn] was initiated by us, the endpoint with the higher ID
- we are accepting and don't have the greater ID =>
[conn] was initiated by the peer, which has the higher ID
In those cases, we keep the new [conn].
Otherwise, we keep the existing one.
There are several cases to consider:
- We decide to drop our new out-bound connection [conn] and use the existing
in-bound one we have already accepted. The peer knows that it has accepted
[conn] but might not know that [existing] was accepted too yet.
If it gets confirmation of [existing] first, it will drop [conn] and just use
that. If it gets the rejection of [conn] first, it will attempt to reconnect
and join its existing connection attempt with [existing], which will soon
succeed.
- We decide to drop our existing out-bound connection and use the new
in-bound one [conn]. When our user tries to reconnect, they will
find the new in-bound one and succeed. The peer knows it accepted
[conn] but may not be sure of [existing] yet. Either way, it will
continue with [conn] without trouble.
- We decide to drop our newly accepted in-bound connection [conn]
and use the existing out-bound one. Since the peer has already
approved the [existing], it no longer cares about this one.
- We decide to drop our previously-accepted in-bound connection and
use our new out-bound one [conn]. When the peer accepted our out-bound
connection it already switched away from the old one.
(it could also happen if the peer initiated two connections with the
same digest, but a good peer won't do that)
*)letmy_id=Auth.Secret_key.digest~hash(Lazy.forcet.secret_key)inletkeep_new=(my_id>peer_id)=(mode=`Connect)inifkeep_newthen(letconn=add_tls_connectiont~switchendpointinletreason=Capnp_rpc.Exception.v"Closing duplicate connection"inCapTP.disconnectexistingreason>|=fun()->conn)else(Lwt_switch.turn_offswitch>|=fun()->existing)letpublic_addresst=t.addressletconnect_anontaddr~service=letswitch=Lwt_switch.create()inNetwork.connectt.network~switch~secret_key:t.secret_keyaddr>>=function|Error(`Msgm)->Lwt.return@@Error(Capnp_rpc.Exception.vm)|Okep->add_connectiont~switchep~mode:`Connect>|=funconn->Ok(CapTP.bootstrapconnservice)letinitiate_connectiontremote_idaddrservice=(* We need to start a new connection attempt. *)letswitch=Lwt_switch.create()inletconn=Network.connectt.network~switch~secret_key:t.secret_keyaddr>>=function|Error(`Msgm)->Lwt.return@@Error(Capnp_rpc.Exception.vm)|Okep->add_connectiont~switchep~mode:`Connect>|=funconn->Okconnint.connecting<-ID_map.addremote_idconnt.connecting;conn>|=funconn->t.connecting<-ID_map.removeremote_idt.connecting;matchconnwith|Okconn->Ok(CapTP.bootstrapconnservice)|Error_ase->eletrecconnect_authtremote_idaddr~service=letmy_id=Auth.Secret_key.digest~hash(Lazy.forcet.secret_key)inifAuth.Digest.equalremote_idmy_idthenRestorer.restoret.restoreserviceelsematchID_map.findremote_idt.connectionswith|SomeconnwhenCapTP.disconnectingconn->Lwt_condition.waitt.connection_removed>>=fun()->connect_authtremote_idaddr~service|Someconn->(* Already connected; use that. *)Lwt.return@@Ok(CapTP.bootstrapconnservice)|None->matchID_map.findremote_idt.connectingwith|None->initiate_connectiontremote_idaddrservice|Someconn->(* We're already trying to establish a connection, wait for that. *)conn>|=function|Okconn->Ok(CapTP.bootstrapconnservice)|Error_ase->eletmake_sturdy_reftsr=Cast.sturdy_of_raw@@object(_:Private.Capnp_core.sturdy_ref)methodconnect=let(addr,service)=srinletremote_id=Network.Address.digestaddrinLwt_result.mapCast.cap_to_raw(ifremote_id=Auth.Digest.insecurethenconnect_anontaddr~serviceelseconnect_authtremote_idaddr~service)methodto_uri_with_secrets=Network.Address.to_urisrendletsturdy_reftservice:'aSturdy_ref.t=matcht.addresswith|None->failwith"sturdy_ref: vat was not configured with an address"|Someaddress->make_sturdy_reft(address,service)letexport_tsr=(* [t] isn't used currently. However, requiring it does emphasise that importing/exporting
is a somewhat privileged operation (as it reveals the secret tokens in the sturdy ref). *)(Cast.sturdy_to_rawsr)#to_uri_with_secretsletsturdy_uritid=sturdy_reftid|>exporttletimportturi=matchNetwork.Address.parse_uriuriwith|Error_ase->e|Oksr->Ok(make_sturdy_reftsr)letimport_exnturi=matchimportturiwith|Okx->x|Error(`Msgm)->failwithmletpp_vat_idf=function|None->Fmt.stringf"Client-only vat"|Someaddr->Fmt.pff"Vat at %a"Network.Address.ppaddrletdump_connectingf(id,_)=Fmt.pff"%a => (pending)"Auth.Digest.ppidletdump_id_connf(id,conn)=Fmt.pff"%a => %a"Auth.Digest.ppidCapTP.dumpconnletdumpft=Fmt.pff"@[<v2>%a@,Connecting: %a@,Connected: %a@,Anonymous: %a@]"pp_vat_idt.address(ID_map.dumpdump_connecting)t.connecting(ID_map.dumpdump_id_conn)t.connections(Fmt.Dump.listCapTP.dump)t.anon_connectionsend