Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file lwt_stream.ml
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)openLwt.InfixexceptionClosedexceptionFullexceptionEmpty(* A node in a queue of pending data. *)type'anode={mutablenext:'anode;(* Next node in the queue. For the last node it points to itself. *)mutabledata:'aoption;(* Data of this node. For the last node it is always [None]. *)}(* Note: a queue for an exhausted stream is represented by a node
containing [None] followed by a node with itself as next and [None]
as data. *)letnew_node()=letrecnode={next=node;data=None}innode(* Type of a stream source using a function to create new elements. *)type'afrom={from_create:unit->'aoptionLwt.t;(* Function used to create new elements. *)mutablefrom_thread:unitLwt.t;(* Thread which:
- wait for the thread returned by the last call to [from_next],
- add the next element to the end of the queue.
If it is a sleeping thread, then it must be used instead of creating a
new one with [from_create]. *)}(* Type of a stream source for push streams. *)typepush={mutablepush_signal:unitLwt.t;(* Thread signaled when a new element is added to the stream. *)mutablepush_waiting:bool;(* Is a thread waiting on [push_signal] ? *)mutablepush_external:Obj.t;(* Reference to an external source. *)}(* Type of a stream source for bounded-push streams. *)type'apush_bounded={mutablepushb_signal:unitLwt.t;(* Thread signaled when a new element is added to the stream. *)mutablepushb_waiting:bool;(* Is a thread waiting on [pushb_signal] ? *)mutablepushb_size:int;(* Size of the queue. *)mutablepushb_count:int;(* Current length of the queue. *)mutablepushb_pending:'aoption;(* The next element to push if a thread blocked on push. We store it
here to be sure it will be the first element to be added when
space becomes available. *)mutablepushb_push_waiter:unitLwt.t;mutablepushb_push_wakener:unitLwt.u;(* Thread blocked on push. *)mutablepushb_external:Obj.t;(* Reference to an external source. *)}(* Source of a stream. *)type'asource=|Fromof'afrom|From_directof(unit->'aoption)|Pushofpush|Push_boundedof'apush_boundedtype'at={source:'asource;(* The source of the stream. *)close:unitLwt.u;(* A waiter for a thread that sleeps until the stream is closed. *)mutablenode:'anode;(* Pointer to first pending element, or to [last] if there is no
pending element. *)last:'anoderef;(* Node marking the end of the queue of pending elements. *)}classtype['a]bounded_push=objectmethodsize:intmethodresize:int->unitmethodpush:'a->unitLwt.tmethodclose:unitmethodcount:intmethodblocked:boolmethodclosed:boolmethodset_reference:'a.'a->unitend(* The only difference between two clones is the pointer to the first
pending element. *)letclones=(matchs.sourcewith|Push_bounded_->invalid_arg"Lwt_stream.clone"|From_|From_direct_|Push_->());{source=s.source;close=s.close;node=s.node;last=s.last;}letfrom_sourcesource=letlast=new_node()inlet_,close=Lwt.wait()in{source=source;close=close;node=last;last=reflast}letfromf=from_source(From{from_create=f;from_thread=Lwt.return_unit})letfrom_directf=from_source(From_directf)letcloseds=(Lwt.waiter_of_wakener[@ocaml.warning"-3"])s.closeletis_closeds=not(Lwt.is_sleeping(closeds))leton_terminationsf=Lwt.async(fun()->closeds>|=f)leton_terminate=on_terminationletenqueue'elast=letnode=!lastandnew_last=new_node()innode.data<-e;node.next<-new_last;last:=new_lastletenqueuees=enqueue'es.lastletcreate_with_reference()=(* Create the source for notifications of new elements. *)letsource,push_signal_resolver=letpush_signal,push_signal_resolver=Lwt.wait()in({push_signal;push_waiting=false;push_external=Obj.repr()},refpush_signal_resolver)inlett=from_source(Pushsource)in(* [push] should not close over [t] so that it can be garbage collected even
* there are still references to [push]. Unpack all the components of [t]
* that [push] needs and reference those identifiers instead. *)letclose=t.closeandlast=t.lastin(* The push function. It does not keep a reference to the stream. *)letpushx=letwaiter_of_wakener=Lwt.waiter_of_wakener[@ocaml.warning"-3"]inifnot(Lwt.is_sleeping(waiter_of_wakenerclose))thenraiseClosed;(* Push the element at the end of the queue. *)enqueue'xlast;(* Send a signal if at least one thread is waiting for a new
element. *)ifsource.push_waitingthenbeginsource.push_waiting<-false;(* Update threads. *)letold_push_signal_resolver=!push_signal_resolverinletnew_waiter,new_push_signal_resolver=Lwt.wait()insource.push_signal<-new_waiter;push_signal_resolver:=new_push_signal_resolver;(* Signal that a new value has been received. *)Lwt.wakeup_laterold_push_signal_resolver()end;(* Do this at the end in case one of the function raise an
exception. *)ifx=NonethenLwt.wakeupclose()in(t,push,funx->source.push_external<-Obj.reprx)letof_seqs=lets=refsinletget()=match!s()with|Seq.Nil->None|Seq.Cons(elt,s')->s:=s';Someeltinfrom_directgetletcreate()=letsource,push,_=create_with_reference()in(source,push)letof_iteriteri=letstream,push=create()initer(funx->push(Somex))i;pushNone;streamletof_listl=of_iterList.iterlletof_arraya=of_iterArray.iteraletof_strings=of_iterString.iters(* Add the pending element to the queue and notify the blocked pushed.
Precondition: info.pushb_pending = Some _
This does not modify info.pushb_count. *)letnotify_pusherinfolast=(* Push the element at the end of the queue. *)enqueue'info.pushb_pendinglast;(* Clear pending element. *)info.pushb_pending<-None;(* Wakeup the pusher. *)letold_wakener=info.pushb_push_wakenerinletwaiter,wakener=Lwt.task()ininfo.pushb_push_waiter<-waiter;info.pushb_push_wakener<-wakener;Lwt.wakeup_laterold_wakener()class['a]bounded_push_impl(info:'apush_bounded)wakener_celllastclose=objectvalmutableclosed=falsemethodsize=info.pushb_sizemethodresizesize=ifsize<0theninvalid_arg"Lwt_stream.bounded_push#resize";info.pushb_size<-size;ifinfo.pushb_count<info.pushb_size&&info.pushb_pending<>Nonethenbegininfo.pushb_count<-info.pushb_count+1;notify_pusherinfolastendmethodpushx=ifclosedthenLwt.failClosedelseifinfo.pushb_pending<>NonethenLwt.failFullelseifinfo.pushb_count>=info.pushb_sizethenbegininfo.pushb_pending<-Somex;Lwt.catch(fun()->info.pushb_push_waiter)(funexn->matchexnwith|Lwt.Canceled->info.pushb_pending<-None;letwaiter,wakener=Lwt.task()ininfo.pushb_push_waiter<-waiter;info.pushb_push_wakener<-wakener;Lwt.failexn|_->Lwt.failexn)endelsebegin(* Push the element at the end of the queue. *)enqueue'(Somex)last;info.pushb_count<-info.pushb_count+1;(* Send a signal if at least one thread is waiting for a new
element. *)ifinfo.pushb_waitingthenbegininfo.pushb_waiting<-false;(* Update threads. *)letold_wakener=!wakener_cellinletnew_waiter,new_wakener=Lwt.wait()ininfo.pushb_signal<-new_waiter;wakener_cell:=new_wakener;(* Signal that a new value has been received. *)Lwt.wakeup_laterold_wakener()end;Lwt.return_unitendmethodclose=ifnotclosedthenbeginclosed<-true;letnode=!lastandnew_last=new_node()innode.data<-None;node.next<-new_last;last:=new_last;ifinfo.pushb_pending<>Nonethenbegininfo.pushb_pending<-None;Lwt.wakeup_later_exninfo.pushb_push_wakenerClosedend;(* Send a signal if at least one thread is waiting for a new
element. *)ifinfo.pushb_waitingthenbegininfo.pushb_waiting<-false;letold_wakener=!wakener_cellin(* Signal that a new value has been received. *)Lwt.wakeup_laterold_wakener()end;Lwt.wakeupclose();endmethodcount=info.pushb_countmethodblocked=info.pushb_pending<>Nonemethodclosed=closedmethodset_reference:'a.'a->unit=funx->info.pushb_external<-Obj.reprxendletcreate_boundedsize=ifsize<0theninvalid_arg"Lwt_stream.create_bounded";(* Create the source for notifications of new elements. *)letinfo,wakener_cell=letwaiter,wakener=Lwt.wait()inletpush_waiter,push_wakener=Lwt.task()in({pushb_signal=waiter;pushb_waiting=false;pushb_size=size;pushb_count=0;pushb_pending=None;pushb_push_waiter=push_waiter;pushb_push_wakener=push_wakener;pushb_external=Obj.repr()},refwakener)inlett=from_source(Push_boundedinfo)in(t,newbounded_push_implinfowakener_cellt.lastt.close)(* Wait for a new element to be added to the queue of pending element
of the stream. *)letfeeds=matchs.sourcewith|Fromfrom->(* There is already a thread started to create a new element,
wait for this one to terminate. *)ifLwt.is_sleepingfrom.from_threadthenLwt.protectedfrom.from_threadelsebegin(* Otherwise request a new element. *)letthread=from.from_create()>>=funx->(* Push the element to the end of the queue. *)enqueuexs;ifx=NonethenLwt.wakeups.close();Lwt.return_unitin(* Allow other threads to access this thread. *)from.from_thread<-thread;Lwt.protectedthreadend|From_directf->letx=f()in(* Push the element to the end of the queue. *)enqueuexs;ifx=NonethenLwt.wakeups.close();Lwt.return_unit|Pushpush->push.push_waiting<-true;Lwt.protectedpush.push_signal|Push_boundedpush->push.pushb_waiting<-true;Lwt.protectedpush.pushb_signal(* Remove [node] from the top of the queue, or do nothing if it was
already consumed.
Precondition: node.data <> None
*)letconsumesnode=ifnode==s.nodethenbegins.node<-node.next;matchs.sourcewith|Push_boundedinfo->ifinfo.pushb_pending=Nonetheninfo.pushb_count<-info.pushb_count-1elsenotify_pusherinfos.last|From_|From_direct_|Push_->()endletrecpeek_recsnode=ifnode==!(s.last)thenfeeds>>=fun()->peek_recsnodeelseLwt.returnnode.dataletpeeks=peek_recss.nodeletrecnpeek_recnodeaccns=ifn<=0thenLwt.return(List.revacc)elseifnode==!(s.last)thenfeeds>>=fun()->npeek_recnodeaccnselsematchnode.datawith|Somex->npeek_recnode.next(x::acc)(n-1)s|None->Lwt.return(List.revacc)letnpeekns=npeek_recs.node[]nsletrecget_recsnode=ifnode==!(s.last)thenfeeds>>=fun()->get_recsnodeelsebeginifnode.data<>Nonethenconsumesnode;Lwt.returnnode.dataendletgets=get_recss.nodetype'aresult=|Valueof'a|Errorofexnletrecget_exn_recsnode=ifnode==!(s.last)thenLwt.try_bind(fun()->feeds)(fun()->get_exn_recsnode)(funexn->Lwt.return(Some(Errorexn:_result)))(* Note: the [Error] constructor above is from [Lwt_stream.result], not
[Pervasives.result], nor its alias [Lwt.result]. [Lwt_stream.result] is
a deprecated type, defined right above this function.
The type constraint is necessary to avoid a warning about an ambiguous
constructor. *)elsematchnode.datawith|Somevalue->consumesnode;Lwt.return(Some(Valuevalue))|None->Lwt.return_noneletmap_exns=from(fun()->get_exn_recss.node)letrecget_exn_rec'snode=ifnode==!(s.last)thenLwt.try_bind(fun()->feeds)(fun()->get_exn_rec'snode)(funexn->Lwt.return(Some(Result.Errorexn)))elsematchnode.datawith|Somevalue->consumesnode;Lwt.return(Some(Result.Okvalue))|None->Lwt.return_noneletwrap_exns=from(fun()->get_exn_rec'ss.node)letrecnget_recnodeaccns=ifn<=0thenLwt.return(List.revacc)elseifnode==!(s.last)thenfeeds>>=fun()->nget_recnodeaccnselsematchs.node.datawith|Somex->consumesnode;nget_recnode.next(x::acc)(n-1)s|None->Lwt.return(List.revacc)letngetns=nget_recs.node[]nsletrecget_while_recnodeaccfs=ifnode==!(s.last)thenfeeds>>=fun()->get_while_recnodeaccfselsematchnode.datawith|Somex->lettest=fxiniftestthenbeginconsumesnode;get_while_recnode.next(x::acc)fsendelseLwt.return(List.revacc)|None->Lwt.return(List.revacc)letget_whilefs=get_while_recs.node[]fsletrecget_while_s_recnodeaccfs=ifnode==!(s.last)thenfeeds>>=fun()->get_while_s_recnodeaccfselsematchnode.datawith|Somex->beginfx>>=function|true->consumesnode;get_while_s_recnode.next(x::acc)fs|false->Lwt.return(List.revacc)end|None->Lwt.return(List.revacc)letget_while_sfs=get_while_s_recs.node[]fsletrecnext_recsnode=ifnode==!(s.last)thenfeeds>>=fun()->next_recsnodeelsematchnode.datawith|Somex->consumesnode;Lwt.returnx|None->Lwt.failEmptyletnexts=next_recss.nodeletreclast_new_recnodexs=ifnode==!(s.last)thenletthread=feedsinmatchLwt.statethreadwith|Lwt.Return_->last_new_recnodexs|Lwt.Failexn->Lwt.failexn|Lwt.Sleep->Lwt.returnxelsematchnode.datawith|Somex->consumesnode;last_new_recnode.nextxs|None->Lwt.returnxletlast_news=letnode=s.nodeinifnode==!(s.last)thenletthread=nextsinmatchLwt.statethreadwith|Lwt.Returnx->last_new_recnodexs|Lwt.Fail_|Lwt.Sleep->threadelsematchnode.datawith|Somex->consumesnode;last_new_recnode.nextxs|None->Lwt.failEmptyletrecto_list_recnodeaccs=ifnode==!(s.last)thenfeeds>>=fun()->to_list_recnodeaccselsematchnode.datawith|Somex->consumesnode;to_list_recnode.next(x::acc)s|None->Lwt.return(List.revacc)letto_lists=to_list_recs.node[]sletrecto_string_recnodebufs=ifnode==!(s.last)thenfeeds>>=fun()->to_string_recnodebufselsematchnode.datawith|Somex->consumesnode;Buffer.add_charbufx;to_string_recnode.nextbufs|None->Lwt.return(Buffer.contentsbuf)letto_strings=to_string_recs.node(Buffer.create128)sletjunks=letnode=s.nodeinifnode==!(s.last)thenbeginfeeds>>=fun()->ifnode.data<>Nonethenconsumesnode;Lwt.return_unitendelsebeginifnode.data<>Nonethenconsumesnode;Lwt.return_unitendletrecnjunk_recnodens=ifn<=0thenLwt.return_unitelseifnode==!(s.last)thenfeeds>>=fun()->njunk_recnodenselsematchnode.datawith|Some_->consumesnode;njunk_recnode.next(n-1)s|None->Lwt.return_unitletnjunkns=njunk_recs.nodensletrecjunk_while_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->junk_while_recnodefselsematchnode.datawith|Somex->lettest=fxiniftestthenbeginconsumesnode;junk_while_recnode.nextfsendelseLwt.return_unit|None->Lwt.return_unitletjunk_whilefs=junk_while_recs.nodefsletrecjunk_while_s_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->junk_while_s_recnodefselsematchnode.datawith|Somex->beginfx>>=function|true->consumesnode;junk_while_s_recnode.nextfs|false->Lwt.return_unitend|None->Lwt.return_unitletjunk_while_sfs=junk_while_s_recs.nodefsletrecjunk_old_recnodes=ifnode==!(s.last)thenletthread=feedsinmatchLwt.statethreadwith|Lwt.Return_->junk_old_recnodes|Lwt.Failexn->Lwt.failexn|Lwt.Sleep->Lwt.return_unitelsematchnode.datawith|Some_->consumesnode;junk_old_recnode.nexts|None->Lwt.return_unitletjunk_olds=junk_old_recs.nodesletrecget_available_recnodeaccs=ifnode==!(s.last)thenletthread=feedsinmatchLwt.statethreadwith|Lwt.Return_->get_available_recnodeaccs|Lwt.Failexn->raiseexn|Lwt.Sleep->List.revaccelsematchnode.datawith|Somex->consumesnode;get_available_recnode.next(x::acc)s|None->List.revaccletget_availables=get_available_recs.node[]sletrecget_available_up_to_recnodeaccns=ifn<=0thenList.revaccelseifnode==!(s.last)thenletthread=feedsinmatchLwt.statethreadwith|Lwt.Return_->get_available_up_to_recnodeaccns|Lwt.Failexn->raiseexn|Lwt.Sleep->List.revaccelsematchs.node.datawith|Somex->consumesnode;get_available_up_to_recnode.next(x::acc)(n-1)s|None->List.revaccletget_available_up_tons=get_available_up_to_recs.node[]nsletrecis_emptys=ifs.node==!(s.last)thenfeeds>>=fun()->is_emptyselseLwt.return(s.node.data=None)letmapfs=from(fun()->gets>|=function|Somex->letx=fxinSomex|None->None)letmap_sfs=from(fun()->gets>>=function|Somex->fx>|=(funx->Somex)|None->Lwt.return_none)letfilterfs=letrecnext()=lett=getsint>>=function|Somex->lettest=fxiniftestthentelsenext()|None->Lwt.return_noneinfromnextletfilter_sfs=letrecnext()=lett=getsint>>=function|Somex->beginfx>>=function|true->t|false->next()end|None->tinfromnextletfilter_mapfs=letrecnext()=gets>>=function|Somex->letx=fxin(matchxwith|Some_->Lwt.returnx|None->next())|None->Lwt.return_noneinfromnextletfilter_map_sfs=letrecnext()=gets>>=function|Somex->lett=fxin(t>>=function|Some_->t|None->next())|None->Lwt.return_noneinfromnextletmap_listfs=letpendings=ref[]inletrecnext()=match!pendingswith|[]->(gets>>=function|Somex->letl=fxinpendings:=l;next()|None->Lwt.return_none)|x::l->pendings:=l;Lwt.return(Somex)infromnextletmap_list_sfs=letpendings=ref[]inletrecnext()=match!pendingswith|[]->(gets>>=function|Somex->fx>>=funl->pendings:=l;next()|None->Lwt.return_none)|x::l->pendings:=l;Lwt.return(Somex)infromnextletflattens=map_list(funl->l)sletrecfold_recnodefsacc=ifnode==!(s.last)thenfeeds>>=fun()->fold_recnodefsaccelsematchnode.datawith|Somex->consumesnode;letacc=fxaccinfold_recnode.nextfsacc|None->Lwt.returnaccletfoldfsacc=fold_recs.nodefsaccletrecfold_s_recnodefsacc=ifnode==!(s.last)thenfeeds>>=fun()->fold_s_recnodefsaccelsematchnode.datawith|Somex->consumesnode;fxacc>>=funacc->fold_s_recnode.nextfsacc|None->Lwt.returnaccletfold_sfsacc=fold_s_recs.nodefsaccletreciter_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->iter_recnodefselsematchnode.datawith|Somex->consumesnode;let()=fxiniter_recnode.nextfs|None->Lwt.return_unitletiterfs=iter_recs.nodefsletreciter_s_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->iter_s_recnodefselsematchnode.datawith|Somex->consumesnode;fx>>=fun()->iter_s_recnode.nextfs|None->Lwt.return_unitletiter_sfs=iter_s_recs.nodefsletreciter_p_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->iter_p_recnodefselsematchnode.datawith|Somex->consumesnode;letres=fxinletrest=iter_p_recnode.nextfsinres>>=fun()->rest|None->Lwt.return_unitletiter_pfs=iter_p_recs.nodefsletiter_n?(max_concurrency=1)fstream=beginifmax_concurrency<=0thenletmessage=Printf.sprintf"Lwt_stream.iter_n: max_concurrency must be > 0, %d given"max_concurrencyininvalid_argmessageend;letreclooprunningavailable=beginifavailable>0then(Lwt.return(running,available))else(Lwt.nchoose_splitrunning>>=fun(complete,running)->Lwt.return(running,available+List.lengthcomplete))end>>=fun(running,available)->getstream>>=function|None->Lwt.joinrunning|Someelt->loop(felt::running)(predavailable)inloop[]max_concurrencyletrecfind_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->find_recnodefselsematchnode.datawith|Somexasopt->consumesnode;lettest=fxiniftestthenLwt.returnoptelsefind_recnode.nextfs|None->Lwt.return_noneletfindfs=find_recs.nodefsletrecfind_s_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->find_s_recnodefselsematchnode.datawith|Somexasopt->beginconsumesnode;fx>>=function|true->Lwt.returnopt|false->find_s_recnode.nextfsend|None->Lwt.return_noneletfind_sfs=find_s_recs.nodefsletrecfind_map_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->find_map_recnodefselsematchnode.datawith|Somex->consumesnode;letx=fxinifx=Nonethenfind_map_recnode.nextfselseLwt.returnx|None->Lwt.return_noneletfind_mapfs=find_map_recs.nodefsletrecfind_map_s_recnodefs=ifnode==!(s.last)thenfeeds>>=fun()->find_map_s_recnodefselsematchnode.datawith|Somex->consumesnode;lett=fxin(t>>=function|None->find_map_s_recnode.nextfs|Some_->t)|None->Lwt.return_noneletfind_map_sfs=find_map_s_recs.nodefsletcombines1s2=letnext()=lett1=gets1andt2=gets2int1>>=funn1->t2>>=funn2->matchn1,n2with|Somex1,Somex2->Lwt.return(Some(x1,x2))|_->Lwt.return_noneinfromnextletappends1s2=letcurrent_s=refs1inletrecnext()=lett=get!current_sint>>=function|Some_->t|None->if!current_s==s2thenLwt.return_noneelsebegincurrent_s:=s2;next()endinfromnextletconcats_top=letcurrent_s=ref(from(fun()->Lwt.return_none))inletrecnext()=lett=get!current_sint>>=function|Some_->t|None->gets_top>>=function|Somes->current_s:=s;next()|None->Lwt.return_noneinfromnextletchoosestreams=letsources=(s,gets>|=funx->(s,x))inletstreams=ref(List.mapsourcestreams)inletrecnext()=match!streamswith|[]->Lwt.return_none|l->Lwt.choose(List.mapsndl)>>=fun(s,x)->letl=List.remove_assqslinmatchxwith|Some_->streams:=sources::l;Lwt.returnx|None->streams:=l;next()infromnextletparsesf=(matchs.sourcewith|Push_bounded_->invalid_arg"Lwt_stream.parse"|From_|From_direct_|Push_->());letnode=s.nodeinLwt.catch(fun()->fs)(funexn->s.node<-node;Lwt.failexn)lethexdumpstream=letbuf=Buffer.create80andnum=ref0infrombeginfun_->nget16stream>>=function|[]->Lwt.return_none|l->Buffer.clearbuf;Printf.bprintfbuf"%08x| "!num;num:=!num+16;letrecbytespos=function|[]->blankspos|x::l->ifpos=8thenBuffer.add_charbuf' ';Printf.bprintfbuf"%02x "(Char.codex);bytes(pos+1)landblankspos=ifpos<16thenbeginifpos=8thenBuffer.add_stringbuf" "elseBuffer.add_stringbuf" ";blanks(pos+1)endinbytes0l;Buffer.add_stringbuf" |";List.iter(funch->Buffer.add_charbuf(ifch>='\x20'&&ch<='\x7e'thenchelse'.'))l;Buffer.add_charbuf'|';Lwt.return(Some(Buffer.contentsbuf))end