Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file websocket_lwt.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278(*
* Copyright (c) 2012-2016 Vincent Bernardoff <vb@luminar.eu.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*
*)includeWebsocketopenAstringopenLwt.InfixmoduleLwt_IO=IO(Cohttp_lwt_unix.IO)openLwt_IOmoduleRequest=Cohttp.Request.Make(Cohttp_lwt_unix.IO)moduleResponse=Cohttp.Response.Make(Cohttp_lwt_unix.IO)letsection=Lwt_log.Section.make"websocket_lwt"exceptionHTTP_ErrorofstringmoduleConnected_client=structtypet={buffer:Buffer.t;flow:Conduit_lwt_unix.flow;ic:Request.IO.ic;oc:Request.IO.oc;http_request:Cohttp.Request.t;standard_frame_replies:bool;read_frame:unit->Frame.tLwt.t;}letcreate?read_buf?(write_buf=Buffer.create128)http_requestflowicoc=letread_frame=make_read_frame?buf:read_buf~mode:Servericocin{buffer=write_buf;flow;ic;oc;http_request;standard_frame_replies=false;read_frame;}letsend{buffer;oc;_}frame=Buffer.clearbuffer;write_frame_to_buf~mode:Serverbufferframe;Lwt_io.writeoc@@Buffer.contentsbufferletsend_multiple{buffer;oc;_}frames=Buffer.clearbuffer;List.iter(write_frame_to_buf~mode:Serverbuffer)frames;Lwt_io.writeoc@@Buffer.contentsbufferletstandard_recvt=t.read_frame()>>=funfr->matchfr.Frame.opcodewith|Frame.Opcode.Ping->sendt@@Frame.create~opcode:Frame.Opcode.Pong()>|=fun()->fr|Frame.Opcode.Close->(* Immediately echo and pass this last message to the user *)(ifString.lengthfr.Frame.content>=2thensendt@@Frame.create~opcode:Frame.Opcode.Close~content:(String.(sub~start:0~stop:2fr.Frame.content|>Sub.to_string))()elsesendt@@Frame.close1000)>|=fun()->fr|_->Lwt.returnfrletrecvt=ift.standard_frame_repliesthenstandard_recvtelset.read_frame()lethttp_request{http_request;_}=http_requesttypesource=|TCPofIpaddr.t*int|Domain_socketofstring|VchanofConduit_lwt_unix.vchan_flowletsource{flow;_}:source=matchflowwith|Conduit_lwt_unix.TCPtcp_flow->TCP(tcp_flow.Conduit_lwt_unix.ip,tcp_flow.Conduit_lwt_unix.port)|Conduit_lwt_unix.Domain_socket{path;_}->Domain_socketpath|Conduit_lwt_unix.Vchanflow->Vchanflowletmake_standardt={twithstandard_frame_replies=true}endletset_tcp_nodelayflow=letopenConduit_lwt_unixinmatchflowwith|TCP{fd;_}->Lwt_unix.setsockoptfdLwt_unix.TCP_NODELAYtrue|_->()letcheck_origin?(origin_mandatory=false)~hosts=letpredorigin_host=List.exists(funh->String.Ascii.lowercaseh=origin_host)hostsinfunrequest->letheaders=request.Cohttp.Request.headersinmatchCohttp.Header.getheaders"origin"with|None->notorigin_mandatory|Someorigin->letorigin=Uri.of_stringorigininmatchUri.hostoriginwith|None->false|Somehost->(* host is already lowercased by Uri *)predhostletcheck_origin_with_hostrequest=letheaders=request.Cohttp.Request.headersinlethost=Cohttp.Header.getheaders"host"inmatchhostwith|None->failwith"Missing host header"(* mandatory in http/1.1 *)|Somehost->(* remove port *)lethostname=Option.value_map~default:host~f:fst(String.cut~sep:":"host)incheck_origin~hosts:[hostname]requestletwith_connection?(extra_headers=Cohttp.Header.init())?(random_string=Rng.init())?(ctx=Conduit_lwt_unix.default_ctx)clienturi=letconnect()=letmoduleC=Cohttpinletnonce=random_string16|>B64.encode~pad:trueinletheaders=C.Header.add_listextra_headers["Upgrade","websocket";"Connection","Upgrade";"Sec-WebSocket-Key",nonce;"Sec-WebSocket-Version","13"]inletreq=C.Request.make~headersuriinConduit_lwt_unix.connect~ctxclient>>=fun(flow,ic,oc)->set_tcp_nodelayflow;letdrain_handshake()=Request.write(fun_writer->Lwt.return_unit)reqoc>>=fun()->Response.readic>>=(function|`Okr->Lwt.returnr|`Eof->Lwt.failEnd_of_file|`Invalids->Lwt.fail@@Failures)>>=funresponse->letstatus=C.Response.statusresponseinletheaders=C.Response.headersresponseinifC.Code.(is_error@@code_of_statusstatus)thenLwt.fail@@HTTP_ErrorC.Code.(string_of_statusstatus)elseifnot(C.Response.versionresponse=`HTTP_1_1&&status=`Switching_protocols&&Option.map~f:String.Ascii.lowercase@@C.Header.getheaders"upgrade"=Some"websocket"&&upgrade_presentheaders&&C.Header.getheaders"sec-websocket-accept"=Some(nonce^websocket_uuid|>b64_encoded_sha1sum))thenLwt.fail(Protocol_error"Bad headers")elseLwt_log.info_f~section"Connected to %s"(Uri.to_stringuri)inLwt.catchdrain_handshakebeginfunexn->Lwt_io.closeic>>=fun()->Lwt.failexnend>>=fun()->Lwt.return(ic,oc)inconnect()>|=fun(ic,oc)->letread_frame=make_read_frame~mode:(Clientrandom_string)icocinletread_frame()=Lwt.catchread_frame(funexn->Lwt.failexn)inletbuf=Buffer.create128inletwrite_frameframe=Buffer.clearbuf;Lwt.wrap2(write_frame_to_buf~mode:(Clientrandom_string))bufframe>>=fun()->Lwt_io.writeoc@@Buffer.contentsbufinread_frame,write_frameletwrite_failed_responseoc=letbody="403 Forbidden"inletbody_len=String.lengthbody|>Int64.of_intinletresponse=Cohttp.Response.make~status:`Forbidden~encoding:(Cohttp.Transfer.Fixedbody_len)()inletopenResponseinwrite~flush:true(funwriter->write_bodywriterbody)responseocletestablish_server?read_buf?write_buf?timeout?stop?on_exn?(check_request=check_origin_with_host)?(ctx=Conduit_lwt_unix.default_ctx)~modereact=letmoduleC=Cohttpinletserver_funflowicoc=(Request.readic>>=function|`Okr->Lwt.returnr|`Eof->(* Remote endpoint closed connection. No further action necessary here. *)Lwt_log.info~section"Remote endpoint closed connection">>=fun()->Lwt.failEnd_of_file|`Invalidreason->Lwt_log.info_f~section"Invalid input from remote endpoint: %s"reason>>=fun()->Lwt.fail@@HTTP_Errorreason)>>=funrequest->letmeth=C.Request.methrequestinletversion=C.Request.versionrequestinletheaders=C.Request.headersrequestinletkey=C.Header.getheaders"sec-websocket-key"inifnot(version=`HTTP_1_1&&meth=`GET&&Option.map~f:String.Ascii.lowercase@@C.Header.getheaders"upgrade"=Some"websocket"&&key<>None&&upgrade_presentheaders&&check_requestrequest)thenwrite_failed_responseoc>>=fun()->Lwt.fail(Protocol_error"Bad headers")elseletkey=Option.value_exnkeyinlethash=key^websocket_uuid|>b64_encoded_sha1suminletresponse_headers=C.Header.of_list["Upgrade","websocket";"Connection","Upgrade";"Sec-WebSocket-Accept",hash]inletresponse=C.Response.make~status:`Switching_protocols~encoding:C.Transfer.Unknown~headers:response_headers()inResponse.write(fun_writer->Lwt.return_unit)responseoc>>=fun()->letclient=Connected_client.create?read_buf?write_bufrequestflowicocinreactclientinConduit_lwt_unix.serve?on_exn?timeout?stop~ctx~modebeginfunflowicoc->set_tcp_nodelayflow;server_funflowicocendletmk_frame_streamrecv=letf()=recv()>>=funfr->matchfr.Frame.opcodewith|Frame.Opcode.Close->Lwt.return_none|_->Lwt.return(Somefr)inLwt_stream.fromfletestablish_standard_server?read_buf?write_buf?timeout?stop?on_exn?check_request?(ctx=Conduit_lwt_unix.default_ctx)~modereact=letfclient=react(Connected_client.make_standardclient)inestablish_server?read_buf?write_buf?timeout?stop?on_exn?check_request~ctx~modef