Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file system.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197(* Copyright (C) 2023--2024 Petter A. Urkedal <paurkedal@gmail.com>
*
* This library is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version, with the LGPL-3.0 Linking Exception.
*
* This library is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* and the LGPL-3.0 Linking Exception along with this library. If not, see
* <http://www.gnu.org/licenses/> and <https://spdx.org>, respectively.
*)leterror_msgffmt=Format.kasprintf(funmsg->Error(`Msgmsg))fmtmoduletypeFLOW=Caqti_platform.System_sig.SOCKET_OPSwithtype'afiber='atypeocaml=|andsystem=|type'aimpl=|OCaml:(moduleFLOWwithtypet='a)*'a->ocamlimpl|System:Buffer.t*Miou_unix.file_descr->systemimpltypesocket=Socket:'aimpl->socket[@@unboxed]typeCaqti_error.msg+=|Msg_unixofUnix.error*string*stringlet()=letppppf=function|Msg_unix(err,f,v)->Format.fprintfppf"%s(%s): %s"fv(Unix.error_messageerr)|_->assertfalseinCaqti_error.define_msg~pp[%extension_constructorMsg_unix]externalreraise:exn->'a="%reraise"moduleSystem_core=structincludeCaqti_miou.System_coretypestdenv=unitendincludeSystem_coremoduleAlarm=structtypet=Miou.Condition.t*Miou.Mutex.tletschedule~sw~stdenv:_tfn=lett_now=Mtime_clock.now()inletmutex=Miou.Mutex.create()andcondition=Miou.Condition.create()inletdelay=ifMtime.is_latert~than:t_nowthen0.0elseMtime.Span.to_float_ns(Mtime.spantt_now)*.1e-9inLogs.debug(funm->m"schedule an alarm");let_=async~sw@@fun()->Logs.debug(funm->m"really schedule an alarm");letsleeper=Miou.async@@fun()->Logs.debug(funm->m"Sleep %fs"delay);Miou_unix.sleepdelay;Logs.debug(funm->m"Ring the alarm");`Continueinletcanceller=Miou.async@@fun()->Miou.Condition.waitconditionmutex;`CancelinmatchMiou.await_first[sleeper;canceller]with|Ok`Continue->fn()|Ok`Cancel->()|Error_exn->()in(condition,mutex)letunschedule(condition,mutex)=Miou.Mutex.protectmutex@@fun()->Miou.Condition.signalconditionendmoduleStream=Caqti_miou.StreammodulePool=Caqti_platform.Pool.Make(System_core)(Alarm)moduleNet=structmoduleSockaddr=structtypet=Unix.sockaddrletunixv=Unix.ADDR_UNIXvlettcp(addr,port)=Unix.ADDR_INET(Ipaddr_unix.to_inet_addraddr,port)endletgetaddrinfo~stdenv:()hostport=letopts=Unix.[AI_SOCKTYPESOCK_STREAM]inmatchUnix.getaddrinfo(Domain_name.to_stringhost)(string_of_intport)optswith|lst->Ok(List.map(funai->ai.Unix.ai_addr)lst)|exceptionNot_found->Ok[]|exceptionUnix.Unix_error(err,f,v)->error_msgf"%s(%s): %s"fv(Unix.error_messageerr)letconvert_io_exception=function|Unix.Unix_error(err,f,v)->Some(Msg_unix(err,f,v))|_->Nonetypetcp_flow=Miou_unix.file_descrtypetls_flow=ocamlimplmoduleSocket=structtypet=socketletoutput_char(Socketimpl)chr=matchimplwith|System(buf,_)->Buffer.add_charbufchr|OCaml((moduleFlow),fd)->Flow.output_charfdchrletoutput_string(Socketimpl)str=matchimplwith|System(buf,_)->Buffer.add_stringbufstr|OCaml((moduleFlow),fd)->Flow.output_stringfdstrletflush(Socketimpl)=matchimplwith|System(buf,fd)->letstr=Buffer.contentsbufinBuffer.clearbuf;ifString.lengthstr>0thenMiou_unix.writefdstr|OCaml((moduleFlow),fd)->Flow.flushfdletinput_char(Socketimpl)=matchimplwith|System(_,fd)->letbuf=Bytes.make1'\000'inletlen=Miou_unix.readfdbufiniflen=0thenraiseEnd_of_fileelseBytes.getbuf0|OCaml((moduleFlow),fd)->Flow.input_charfdletreally_input(Socketimpl)bufofflen=matchimplwith|System(_,fd)->letrecgoofflen=iflen>0thenletlen'=Miou_unix.readfdbuf~off~leningo(off+len')(len-len')ingoofflen|OCaml((moduleFlow),fd)->Flow.really_inputfdbufofflenletclose=function|Socket(System(_,fd))->Miou_unix.closefd|Socket(OCaml((moduleFlow),fd))->Flow.closefdendletsocket=function|Unix.ADDR_UNIX_->letfd=Unix.socket~cloexec:trueUnix.PF_UNIXUnix.SOCK_STREAM0inOk(Miou_unix.of_file_descr~non_blocking:truefd)|Unix.ADDR_INET(inet_addr,_)whenUnix.is_inet6_addrinet_addr->Ok(Miou_unix.tcpv6())|_->Ok(Miou_unix.tcpv4())letconnect_tcp~sw:_~stdenv:_sockaddr=let(>>=)=Result.bindinsocketsockaddr>>=funsocket->matchMiou_unix.connectsocketsockaddrwith|()->Ok(Socket(System(Buffer.create0x7ff,socket)))|exceptionUnix.Unix_error(err,f,v)->Miou_unix.closesocket;Error(Msg_unix(err,f,v))|exceptionexn->Miou_unix.closesocket;raiseexnlettcp_flow_of_socket(Socketimpl)=matchimplwith|System(_,fd)->Somefd|OCaml_->Noneletsocket_of_tls_flow:sw:_->tls_flow->Socket.t=fun~sw:_->function|OCaml_asimpl->SocketimplmoduletypeTLS_PROVIDER=Caqti_platform.System_sig.TLS_PROVIDERwithtype'afiber:='aandtypetcp_flow:=tcp_flowandtypetls_flow:=tls_flowlettls_providers_r:(moduleTLS_PROVIDER)listref=ref[]letregister_tls_providerp=tls_providers_r:=p::!tls_providers_rlettls_providersconfig=ifCaqti_connect_config.mem_name"tls"configthenbeginmatchCaqti_platform.Connector.load_library"caqti-tls-miou"with|Ok()->()|Errormsg->Log.warn(funm->m"TLS configured, but missing caqti-tls-miou: %s"msg)end;!tls_providers_rend