Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file ocsigen_stream.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272(* Ocsigen
* ocsigen_stream.ml Copyright (C) 2005 Vincent Balat
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, with linking exception;
* either version 2.1 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*)openOcsigen_libexceptionInterruptedofexnexceptionCancelledexceptionAlready_readexceptionFinalizedtype'astream='astepLwt.tLazy.tand'astep=|Finishedof'astreamoption(* If there is another stream following
(useful for substreams) *)|Contof'a*'astream(* Current buffer, what follows *)typeoutcome=[`Success|`Failure]type'at={mutablestream:'astream;mutablein_use:bool;mutablefinalizer:outcome->unitLwt.t}letnet_buffer_size=ref8192letset_net_buffer_sizei=net_buffer_size:=iletemptyfollow=matchfollowwith|None->Lwt.return(FinishedNone)|Somest->Lwt.return(Finished(Some(Lazy.from_funst)))letcontstrif=Lwt.return(Cont(stri,Lazy.from_funf))letmake?finalize:(g=fun_->Lwt.return())f={stream=Lazy.from_funf;in_use=false;finalizer=g}letnext=Lazy.forceletrecget_auxst=lazy(Lwt.try_bind(fun()->Lazy.forcest.stream)(fune->Lwt.return(matchewith|Cont(s,rem)->st.stream<-rem;Cont(s,get_auxst)|_->e))(fune->st.stream<-lazy(Lwt.faile);Lwt.fail(Interruptede)))letgetst=ifst.in_usethenraiseAlready_read;st.in_use<-true;get_auxst(** read the stream until the end, without decoding *)letrecconsume_auxst=nextst>>=fune->matchewith|Cont(_,f)->consume_auxf|Finished(Somess)->consume_auxss|FinishedNone->Lwt.return()letcancelst=letst'=st.streaminst.stream<-lazy(Lwt.failCancelled);consume_auxst'letconsumest=consume_auxst.streamletfinalizeststatus=letf=st.finalizerinst.finalizer<-(fun_->Lwt.return());fstatus>>=fun()->st.stream<-lazy(Lwt.failFinalized);Lwt.return()letadd_finalizerstg=letf=st.finalizerinst.finalizer<-(funstatus->fstatus>>=fun()->gstatus)(****)(** String streams *)exceptionStream_too_smallexceptionStream_errorofstringexceptionString_too_largeletstring_of_streamms=letbuff=Buffer.create(m/4)inletrecauxis=nexts>>=function|Finished_->Lwt.returnbuff|Cont(s,f)->leti=i+String.lengthsinifi>mthenLwt.failString_too_largeelse(Buffer.add_stringbuffs;auxif)inaux0s>|=Buffer.contentsletenlarge_stream=function|Finished_a->Lwt.failStream_too_small|Cont(s,f)->(letlong=String.lengthsinletmax=!net_buffer_sizeiniflong>=maxthenLwt.failInput_is_too_largeelsenextf>>=fune->matchewith|Finished_->Lwt.failStream_too_small|Cont(r,ff)->letlong2=String.lengthrinletlong3=long+long2inletnew_s=s^riniflong3<=maxthenLwt.return(Cont(new_s,ff))elseletlong4=long3-maxincont(String.subnew_s0max)(fun()->Lwt.return(Cont(String.subnew_smaxlong4,ff))))letrecstream_wantslen=(* returns a stream with at least len bytes read if possible *)matchswith|Finished_->Lwt.returns|Cont(stri,_f)->(ifString.lengthstri>=lenthenLwt.returnselseLwt.catch(fun()->enlarge_streams>>=funr->Lwt.return(`OKr))(function|Stream_too_small->Lwt.return`Too_small|e->Lwt.faile)>>=function|`OKr->stream_wantrlen|`Too_small->Lwt.returns)letcurrent_buffer=function|Finished_->raiseStream_too_small|Cont(s,_)->sletrecskipsk=matchswith|Finished_->raiseStream_too_small|Cont(s,f)->letlen=String.lengthsinletlen64=Int64.of_intleninifInt64.compareklen64<=0thenletk=Int64.to_intkinLwt.return(Cont(String.subsk(len-k),f))elseenlarge_stream(Cont("",f))>>=funs->skips(Int64.subklen64)letsubstreamdelims=letldelim=String.lengthdeliminifldelim=0thenLwt.fail(Stream_error"Empty delimiter")elseletrdelim=Re.Pcre.(regexp(quotedelim))inletrecaux=function|Finished_->Lwt.failStream_too_small|Cont(s,f)asstre->(letlen=String.lengthsiniflen<ldelimthenenlarge_streamstre>>=auxelsetryletp,_=Ocsigen_lib.Netstring_pcre.search_forwardrdelims0incont(String.subs0p)(fun()->empty(Some(fun()->Lwt.return(Cont(String.subsp(len-p),f)))))withNot_found->letpos=len+1-ldelimincont(String.subs0pos)(fun()->nextf>>=function|Finished_->Lwt.failStream_too_small|Cont(s',f')->aux(Cont(String.subspos(len-pos)^s',f'))))inauxs(*****************************************************************************)(*VVV Is it the good place for this? *)letof_filefilename=letfd=Lwt_unix.of_unix_file_descr(Unix.openfilefilename[Unix.O_RDONLY;Unix.O_NONBLOCK]0o666)inletch=Lwt_io.(of_fd~mode:input)fdinletbuf=Bytes.create1024inletrecaux()=Lwt_io.read_intochbuf01024>>=funn->ifn=0thenemptyNoneelse(* Streams should be immutable, thus we always make a copy
of the buffer *)cont(Bytes.sub_stringbuf0n)auxinmake~finalize:(fun_->Lwt_unix.closefd)auxletof_strings=make(fun()->conts(fun()->emptyNone))(** Convert a {!Lwt_stream.t} to an {!Ocsigen_stream.t}. *)letof_lwt_streamstream=letrecaux()=Lwt_stream.getstream>>=function|Somee->conteaux|None->emptyNoneinmakeaux(** Convert an {!Ocsigen_stream.t} into a {!Lwt_stream.t}.
@param is_empty function to skip empty chunk.
*)letto_lwt_stream?(is_empty=fun_->false)o_stream=letstream=ref(geto_stream)inletrecwrap()=next!stream>>=function|FinishedNone->o_stream.finalizer`Success>>=fun()->Lwt.returnNone|Finished(Somenext)->stream:=next;wrap()|Cont(value,next)->stream:=next;ifis_emptyvaluethenwrap()elseLwt.return(Somevalue)inLwt_stream.fromwrapmoduleStringStream=structtypeout=stringttypem=(stringstream->stringstepLwt.t)Lazy.tletempty:m=lazy(func->Lazy.forcec)letconcat(m:m)(f:m):m=lazy(func->Lazy.forcem(lazy(Lazy.forcefc)))letput(s:string):m=lazy(func->Lwt.return(Cont(s,c)))letmake_stream(m:m):stringstream=lazy(Lazy.forcem(lazy(Lwt.return(FinishedNone))))letmake(m:m):out=make(fun()->Lazy.force(make_streamm))end