Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file tcp.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500open!Coreopen!ImportmoduleUnix=Unix_syscallsmoduleHost=Unix.HostmoduleSocket=Unix.SocketmoduleWhere_to_connect=structtype'addrt={socket_type:'addrSocket.Type.t;remote_address:unit->'addrDeferred.t;local_address:'addroption;info:Sexp.t}letsexp_of_t_{info;_}=infotypeinet=Socket.Address.Inet.tt[@@derivingsexp_of]typeunix=Socket.Address.Unix.tt[@@derivingsexp_of]letremote_addresst=t.remote_address()letcreate_local_address~bind_to_address~bind_to_port=letport=Option.valuebind_to_port~default:0inmatchbind_to_addresswith|None->Socket.Address.Inet.create_bind_any~port|Someinet_addr->Socket.Address.Inet.create~portinet_addr;;letof_host_and_port?bind_to_address?bind_to_port({Host_and_port.host;port}ashp)={socket_type=Socket.Type.tcp;remote_address=(fun()->Unix.Inet_addr.of_string_or_getbynamehost>>|funinet_addr->Socket.Address.Inet.createinet_addr~port);local_address=Some(create_local_address~bind_to_address~bind_to_port);info=[%sexp(hp:Host_and_port.t)]};;letof_filefile={socket_type=Socket.Type.unix;remote_address=(fun()->return(Socket.Address.Unix.createfile));local_address=None;info=[%sexp_of:string]file};;letof_inet_address?bind_to_address?bind_to_portaddress={socket_type=Socket.Type.tcp;remote_address=(fun()->returnaddress);local_address=Some(create_local_address~bind_to_address~bind_to_port);info=[%sexp_of:Socket.Address.Inet.t]address};;letof_unix_addressaddress={socket_type=Socket.Type.unix;remote_address=(fun()->returnaddress);local_address=None;info=[%sexp_of:Socket.Address.Unix.t]address};;endletclose_sock_on_errorsf=try_with~name:"Tcp.close_sock_on_error"f>>|function|Okv->v|Errore->(* [close] may fail, but we don't really care, since it will fail
asynchronously. The error we really care about is [e], and the
[raise_error] will cause the current monitor to see that. *)don't_wait_for(Unix.close(Socket.fds));raisee;;letreader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizes=letfd=Socket.fdsin(Reader.create?buf_len:reader_buffer_sizefd,Writer.create?buffer_age_limit?buf_len:writer_buffer_sizefd);;letconnect_sock?socket?interrupt?(timeout=sec10.)(where_to_connect:_Where_to_connect.t)=where_to_connect.remote_address()>>=funaddress->lettimeout=Clock.aftertimeoutinletinterrupt=matchinterruptwith|None->timeout|Someinterrupt->Deferred.any[interrupt;timeout]inletconnect_interruptibles=Socket.connect_interruptiblesaddress~interruptinDeferred.create(funresult->lets=matchsocketwith|Somes->s|None->Socket.createwhere_to_connect.socket_typeinclose_sock_on_errors(fun()->matchwhere_to_connect.local_addresswith|None->connect_interruptibles|Somelocal_interface->Socket.bindslocal_interface>>=funs->connect_interruptibles)>>>function|`Oks->Ivar.fillresults|`Interrupted->don't_wait_for(Unix.close(Socket.fds));letaddress=Socket.Address.to_stringaddressinifOption.is_some(Deferred.peektimeout)thenraise_s[%sexp"connection attempt timeout",(address:string)]elseraise_s[%sexp"connection attempt aborted",(address:string)]);;type'awith_connect_options=?buffer_age_limit:[`At_mostofTime.Span.t|`Unlimited]->?interrupt:unitDeferred.t->?reader_buffer_size:int->?writer_buffer_size:int->?timeout:Time.Span.t->'aletconnect?socket?buffer_age_limit?interrupt?reader_buffer_size?writer_buffer_size?timeoutwhere_to_connect=connect_sock?socket?interrupt?timeoutwhere_to_connect>>|funs->letr,w=reader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizesins,r,w;;letcollect_errorswriterf=letmonitor=Writer.monitorwriterinignore(Monitor.detach_and_get_error_streammonitor:_Stream.t);(* don't propagate errors up, we handle them here *)choose[choice(Monitor.get_next_errormonitor)(fune->Errore);choice(try_with~name:"Tcp.collect_errors"f)Fn.id];;letclose_connection_via_reader_and_writerrw=Writer.closew~force_close:(Clock.after(sec30.))>>=fun()->Reader.closer;;letwith_connection?buffer_age_limit?interrupt?reader_buffer_size?writer_buffer_size?timeoutwhere_to_connectf=connect_sock?interrupt?timeoutwhere_to_connect>>=funsocket->letr,w=reader_writer_of_sock?buffer_age_limit?reader_buffer_size?writer_buffer_sizesocketinletres=collect_errorsw(fun()->fsocketrw)inDeferred.any[(res>>|fun(_:('a,exn)Result.t)->());Reader.close_finishedr;Writer.close_finishedw]>>=fun()->close_connection_via_reader_and_writerrw>>=fun()->res>>|function|Okv->v|Errore->raisee;;moduleBind_to_address=structtypet=|AddressofUnix.Inet_addr.t|All_addresses|Localhost[@@derivingsexp_of]endmoduleBind_to_port=structtypet=|On_portofint|On_port_chosen_by_os[@@derivingsexp_of]endmoduleWhere_to_listen=structtype('address,'listening_on)t={socket_type:'addressSocket.Type.t;address:'address;listening_on:('address->'listening_on[@sexp.opaque])}[@@derivingsexp_of,fields]typeinet=(Socket.Address.Inet.t,int)t[@@derivingsexp_of]typeunix=(Socket.Address.Unix.t,string)t[@@derivingsexp_of]letcreate~socket_type~address~listening_on={socket_type;address;listening_on}letbind_to(bind_to_address:Bind_to_address.t)(bind_to_port:Bind_to_port.t)=letport=matchbind_to_portwith|On_portport->port|On_port_chosen_by_os->0inletaddress=matchbind_to_addresswith|All_addresses->Socket.Address.Inet.create_bind_any~port|Addressaddr->Socket.Address.Inet.createaddr~port|Localhost->Socket.Address.Inet.createUnix.Inet_addr.localhost~portin{socket_type=Socket.Type.tcp;address;listening_on=(function|`Inet(_,port)->port)};;letof_portport=bind_toAll_addresses(On_portport)letof_port_chosen_by_os=bind_toAll_addressesOn_port_chosen_by_osletof_filepath={socket_type=Socket.Type.unix;address=Socket.Address.Unix.createpath;listening_on=(fun_->path)};;endmoduleServer=structmoduleConnection=structtype'addresst={client_socket:([`Active],'address)Socket.t;client_address:'address}[@@derivingfields,sexp_of]letinvariantinvariant_addresst=Invariant.invariant[%here]t[%sexp_of:_t](fun()->letcheckf=Invariant.check_fieldtfinFields.iter~client_socket:ignore~client_address:(checkinvariant_address));;letcreate~client_socket~client_address={client_socket;client_address}letcloset=Fd.close(Socket.fdt.client_socket)endtype('address,'listening_on)t={socket:([`Passive],'address)Socket.t;listening_on:'listening_on;on_handler_error:[`Raise|`Ignore|`Callof'address->exn->unit];handle_client:'address->([`Active],'address)Socket.t->(unit,exn)Result.tDeferred.t;max_connections:int;max_accepts_per_batch:int;connections:'addressConnection.tBag.t;mutableaccept_is_pending:bool;mutabledrop_incoming_connections:bool;close_finished_and_handlers_determined:unitIvar.t}[@@derivingfields,sexp_of]letnum_connectionst=Bag.lengtht.connectionstype('address,'listening_on,'callback)create_options=?max_connections:int->?max_accepts_per_batch:int->?backlog:int->?socket:([`Unconnected],'address)Socket.t->on_handler_error:[`Raise|`Ignore|`Callof'address->exn->unit]->('address,'listening_on)Where_to_listen.t->'callback->('address,'listening_on)tDeferred.tletlistening_socket=sockettypeinet=(Socket.Address.Inet.t,int)t[@@derivingsexp_of]typeunix=(Socket.Address.Unix.t,string)t[@@derivingsexp_of]letlistening_on_address(t:(_,_)t)=Socket.getsocknamet.socketletinvariantt:unit=tryletcheckffield=f(Field.getfieldt)inFields.iter~socket:ignore~listening_on:ignore~on_handler_error:ignore~handle_client:ignore~max_connections:(check(funmax_connections->assert(max_connections>=1)))~max_accepts_per_batch:(check(funmax_accepts_per_batch->assert(max_accepts_per_batch>=1)))~connections:(check(funconnections->Bag.invariant(Connection.invariantignore)connections;letnum_connections=num_connectionstinassert(num_connections>=0);assert(num_connections<=t.max_connections)))~accept_is_pending:ignore~drop_incoming_connections:ignore~close_finished_and_handlers_determined:ignorewith|exn->failwiths"invariant failed"(exn,t)[%sexp_of:exn*(_,_)t];;letfdt=Socket.fdt.socketletis_closedt=Fd.is_closed(fdt)letclose_finishedt=Fd.close_finished(fdt)letclose_finished_and_handlers_determinedt=Ivar.readt.close_finished_and_handlers_determined;;letclose?(close_existing_connections=false)t=letfd_closed=Fd.close(fdt)inifnotclose_existing_connectionsthenfd_closedelse(* Connections are removed from the bag by the [maybe_accept] below, as the fds are
closed. *)Deferred.all_unit(fd_closed::List.map(Bag.to_listt.connections)~f:Connection.close);;(* [maybe_accept] is a bit tricky, but the idea is to avoid calling [accept] until we
have an available slot (determined by [num_connections < max_connections]). *)letrecmaybe_acceptt=letavailable_slots=t.max_connections-num_connectionstinif(not(is_closedt))&&available_slots>0&¬t.accept_is_pendingthen(t.accept_is_pending<-true;Socket.accept_at_most~limit:(mint.max_accepts_per_batchavailable_slots)t.socket>>>funaccept_result->t.accept_is_pending<-false;matchaccept_resultwith|`Socket_closed->()|`Okconns->(* It is possible that someone called [close t] after the [accept] returned but
before we got here. In that case, we just close the clients. *)ifis_closedt||t.drop_incoming_connectionsthenList.iterconns~f:(fun(sock,_)->don't_wait_for(Fd.close(Socket.fdsock)))else((* We first [handle_client] on all the connections, which increases
[num_connections], and then call [maybe_accept] to try to accept more
clients, which respects the just-increased [num_connections]. *)List.iterconns~f:(fun(sock,addr)->handle_clienttsockaddr);maybe_acceptt))andhandle_clienttclient_socketclient_address=letconnection=Connection.create~client_socket~client_addressinletconnections_elt=Bag.addt.connectionsconnectionint.handle_clientclient_addressclient_socket>>>funres->Connection.closeconnection>>>fun()->Bag.removet.connectionsconnections_elt;ifDeferred.is_determined(close_finishedt)&&num_connectionst=0thenIvar.fill_if_emptyt.close_finished_and_handlers_determined();(matchreswith|Ok()->()|Errore->(trymatcht.on_handler_errorwith|`Ignore->()|`Raise->raisee|`Callf->fclient_addressewith|e->don't_wait_for(closet);raisee));maybe_acceptt;;letcreate_sock_internal?(max_connections=10_000)?(max_accepts_per_batch=1)?backlog?socket~on_handler_error(where_to_listen:_Where_to_listen.t)handle_client=ifmax_connections<=0thenfailwiths"Tcp.Server.creater got negative [max_connections]"max_connectionssexp_of_int;letsocket,should_set_reuseaddr=matchsocketwith|Somesocket->socket,false|None->Socket.createwhere_to_listen.socket_type,trueinclose_sock_on_errorsocket(fun()->Socket.bind~reuseaddr:should_set_reuseaddrsocketwhere_to_listen.address>>|Socket.listen?backlog)>>|funsocket->lett={socket;listening_on=where_to_listen.listening_on(Socket.getsocknamesocket);on_handler_error;handle_client;max_connections;max_accepts_per_batch;connections=Bag.create();accept_is_pending=false;drop_incoming_connections=false;close_finished_and_handlers_determined=Ivar.create()}in(close_finishedt>>>fun()->ifnum_connectionst=0thenIvar.fill_if_emptyt.close_finished_and_handlers_determined());maybe_acceptt;t;;letcreate_sock?max_connections?max_accepts_per_batch?backlog?socket~on_handler_errorwhere_to_listenhandle_client=create_sock_internal?max_connections?max_accepts_per_batch?backlog~on_handler_error?socketwhere_to_listen(funclient_addressclient_socket->try_with~name:"Tcp.Server.create_sock"(fun()->handle_clientclient_addressclient_socket));;letcreate?buffer_age_limit?max_connections?max_accepts_per_batch?backlog?socket~on_handler_errorwhere_to_listenhandle_client=create_sock_internal?max_connections?max_accepts_per_batch?backlog?socket~on_handler_errorwhere_to_listen(funclient_addressclient_socket->letr,w=reader_writer_of_sock?buffer_age_limitclient_socketinWriter.set_raise_when_consumer_leaveswfalse;Deferred.any[collect_errorsw(fun()->handle_clientclient_addressrw);Writer.consumer_leftw|>Deferred.ok]>>=funres->close_connection_via_reader_and_writerrw>>|fun()->res);;modulePrivate=structletfd=fdendendmodulePrivate=structletclose_connection_via_reader_and_writer=close_connection_via_reader_and_writerend