Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file fiber.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624open!Stdunetype'at=('a->unit)->unit(* This module tries to enforce the following invariants:
- the execution context passed to a continuation is the same as the current
one
- the execution of a fiber always ends with [deref]
- when an exception is raised by the user code, the exception must be
forwarded to the execution context that was active at the time the exception
was raised
- when an exception is raised by the user code, then we assume that the
current fiber didn't reach the [deref] point. As a result we have to call
[deref] at this point on the current execution context
Remarks:
- most of the code assumes that errors will be caught by the caller, so when
we do a context switch, we simply change the current execution context and
chain to the continuation without catching errors. The current [try..with]
will catch any raised error and forward to the current execution context. The
only place we add a [try..with] is at the toplevel or when forking. *)letof_thunkfk=f()kmoduleExecution_context:sigmoduleK:sig(* Represent a suspended fiber *)type'at(* Create a continuation that captures the current execution context *)valcreate:('a->unit)->'at(* Restart a suspended fiber. [run] doesn't preserve the current execution
context and should always be called last. *)valrun:'at->'a->unitend(* Execute the current continuation, making sure to forward errors to the
current execution context. This function doesn't preserve the current
execution context. It should be used to execute the current continuation
before calling [K.run] *)valsafe_run_k:('a->unit)->'a->unit(* Execute a function returning a fiber, passing any raised exception to the
current execution context. This function preserve the current execution
context. It should be called when creating forks.*)valapply:('a->'bt)->'a->'bt(* Add [n] references to the current execution context *)valadd_refs:int->unit(* Decrese the reference count of the current execution context *)valderef:unit->unit(* [wait_errors f] executes [f ()] inside a new execution contexts. Returns a
fiber that terminates when all the fiber in the sub-context have
terminated. *)valwait_errors:(unit->'at)->('a,unit)resultt(* Set the current error handler. [on_error] is called in the current
execution context. *)valset_error_handler:on_error:(Exn_with_backtrace.t->unit)->('a->'bt)->'a->'btvalvars:unit->Univ_map.tvalset_vars:Univ_map.t->('a->'bt)->'a->'btvalset_vars_sync:Univ_map.t->('a->'b)->'a->'b(* Execute a callback with a fresh execution context. For the toplevel
[Fiber.run] function. *)valnew_run:(unit->'a)->'aend=structtypet={on_error:Exn_with_backtrace.tkoption(* This handler must never raise *);vars:Univ_map.t;on_release:on_release}and'aon_release_exec={k:('a,unit)resultk;mutableresult:('a,unit)result;mutableref_count:int}andon_release=|Do_nothing:on_release|Exec:_on_release_exec->on_releaseand'ak={run:'a->unit;ctx:t}letcurrent=ref{on_error=None;vars=Univ_map.empty;on_release=Do_nothing}letadd_refsn=lett=!currentinmatcht.on_releasewith|Do_nothing->()|Execr->r.ref_count<-r.ref_count+nletrecdereft=matcht.on_releasewith|Do_nothing->()|Execr->letn=r.ref_count-1inassert(n>=0);r.ref_count<-n;ifn=0then(current:=r.k.ctx;(* We need to call [safe_run_k] as we might be the in handler of the
[try...with] block inside [apply] and so we are no more in a
[try...with] blocks *)safe_run_kr.k.runr.result)andsafe_run_k:typea.(a->unit)->a->unit=funkx->trykxwithexn->forward_errorexnandforward_error=letreclooptexn=matcht.on_errorwith|None->Exn_with_backtrace.reraiseexn|Some{ctx;run}->(current:=ctx;tryrunexnwithexn->letexn=Exn_with_backtrace.captureexninloopctxexn)infunexn->letexn=Exn_with_backtrace.captureexninlett=!currentinlooptexn;dereftletderef()=deref!currentletwait_errorsfk=lett=!currentinleton_release={k={ctx=t;run=k};ref_count=1;result=Error()}inletchild={twithon_release=Execon_release}incurrent:=child;f()(funx->on_release.result<-Okx;deref())letset_error_handler~on_errorfxk=lett=!currentinleton_error=Some{run=on_error;ctx=t}incurrent:={twithon_error};fx(funx->current:=t;kx)letvars()=!current.varsletset_varsvarsfxk=lett=!currentincurrent:={twithvars};fx(funx->current:=t;kx)letset_vars_sync(typeb)varsfx:b=lett=!currentincurrent:={twithvars};Exn.protect~finally:(fun()->current:=t)~f:(fun()->fx)moduleK=structtype'at='akletcreaterun={run;ctx=!current}letrun{run;ctx}x=current:=ctx;safe_run_krunxendletapplyfxk=letbackup=!currentin(tryfxkwithexn->forward_errorexn);current:=backupletnew_runf=letbackup=!currentinExn.protect~finally:(fun()->current:=backup)~f:(fun()->current:={on_error=None;vars=Univ_map.empty;on_release=Do_nothing};f())endmoduleEC=Execution_contextmoduleK=EC.Kletreturnxk=kxletnever_=()moduleO=structlet(>>>)abk=a(fun()->bk)let(>>=)tfk=t(funx->fxk)let(>>|)tfk=t(funx->k(fx))let(let+)=(>>|)let(let*)=(>>=)endopenOletmapt~f=t>>|fletbindt~f=t>>=fletbothab=let*x=ainlet*y=binreturn(x,y)letsequential_mapl~f=letreclooplacc=matchlwith|[]->return(List.revacc)|x::l->let*x=fxinloopl(x::acc)inloopl[]letsequential_iterl~f=letrecloopl=matchlwith|[]->return()|x::l->let*()=fxinlooplinloopltype('a,'b)fork_and_join_state=|Nothing_yet|Got_aof'a|Got_bof'bletfork_and_joinfafbk=letstate=refNothing_yetinEC.add_refs1;EC.applyfa()(funa->match!statewith|Nothing_yet->state:=Got_aa;EC.deref()|Got_a_->assertfalse|Got_bb->k(a,b));fb()(funb->match!statewith|Nothing_yet->state:=Got_bb;EC.deref()|Got_aa->k(a,b)|Got_b_->assertfalse)letfork_and_join_unitfafbk=letstate=refNothing_yetinEC.add_refs1;EC.applyfa()(fun()->match!statewith|Nothing_yet->state:=Got_a();EC.deref()|Got_a_->assertfalse|Got_bb->kb);fb()(funb->match!statewith|Nothing_yet->state:=Got_bb;EC.deref()|Got_a()->kb|Got_b_->assertfalse)moduleSequence=structtype'afiber='attype'at='anodefiberand'anode=|Nil|Consof'a*'atletrecsequential_itert~f=t>>=function|Nil->return()|Cons(x,t)->let*()=fxinsequential_itert~fletparallel_itert~fk=letn=ref1inletk()=decrn;if!n=0thenk()elseEC.deref()inletrecloopt=t(function|Nil->k()|Cons(x,t)->EC.add_refs1;incrn;EC.applyfxk;loopt)inlooptendletlist_of_option_array=letreclooparriacc=ifi=0thenaccelseleti=i-1inmatcharr.(i)with|None->assertfalse|Somex->looparri(x::acc)infuna->loopa(Array.lengtha)[]letparallel_mapl~fk=matchlwith|[]->k[]|[x]->fx(funx->k[x])|_->letn=List.lengthlinEC.add_refs(n-1);letleft_over=refninletresults=Array.makenNoneinList.iteril~f:(funix->EC.applyfx(funy->results.(i)<-Somey;decrleft_over;if!left_over=0thenk(list_of_option_arrayresults)elseEC.deref()))letparallel_iterl~fk=matchlwith|[]->k()|[x]->fxk|_->letn=List.lengthlinEC.add_refs(n-1);letleft_over=refninletk()=decrleft_over;if!left_over=0thenk()elseEC.deref()inList.iterl~f:(funx->EC.applyfxk)moduleVar=structincludeUniv_map.Keyletgetvar=Univ_map.find(EC.vars())varletget_exnvar=Univ_map.find_exn(EC.vars())varletset_syncvarxf=EC.set_vars_sync(Univ_map.set(EC.vars())varx)f()letsetvarxfk=EC.set_vars(Univ_map.set(EC.vars())varx)f()kletunset_syncvarf=EC.set_vars_sync(Univ_map.remove(EC.vars())var)f()letunsetvarfk=EC.set_vars(Univ_map.remove(EC.vars())var)f()kletcreate()=create~name:"var"(fun_->Dyn.Encoder.string"var")endletwith_error_handlerf~on_errork=EC.set_error_handler~on_errorf()kletwait_errorsfk=EC.wait_errorsfkletfold_errorsf~init~on_error=letacc=refinitinleton_errorexn=acc:=on_errorexn!accinwait_errors(fun()->with_error_handler~on_errorf)>>|function|Ok_asok->ok|Error()->Error!accletcollect_errorsf=let+res=fold_errorsf~init:[]~on_error:(funel->e::l)inmatchreswith|Okx->Okx|Errorl->Error(List.revl)letfinalizef~finally=let*res1=collect_errorsfinlet*res2=collect_errorsfinallyinletres=match(res1,res2)with|Okx,Ok()->Okx|Errorl,Ok_|Ok_,Errorl->Errorl|Errorl1,Errorl2->Error(l1@l2)inmatchreswith|Okx->returnx|Errorl->let*()=parallel_iterl~f:(funexn->Exn_with_backtrace.reraiseexn)in(* We might reach this point if all raised errors were handled by the user *)nevermoduleIvar=structtype'astate=|Fullof'a|Emptyof'aK.tQueue.ttype'at={mutablestate:'astate}letcreate()={state=Empty(Queue.create())}letfilltxk=matcht.statewith|Full_->failwith"Fiber.Ivar.fill"|Emptyq->t.state<-Fullx;EC.safe_run_kk();Queue.iterq~f:(funk->K.runkx)letreadtk=matcht.statewith|Fullx->kx|Emptyq->Queue.pushq(K.createk)letpeektk=k(matcht.statewith|Fullx->Somex|Empty_->None)endmoduleMvar=structtype'at={writers:('a*unitK.t)Queue.t;readers:'aK.tQueue.t;mutablevalue:'aoption}(* Invariant enforced on mvars. We don't actually call this function, but we
keep it here for documentation and to help understand the implementation: *)let_invariantt=matcht.valuewith|None->Queue.is_emptyt.writers|Some_->Queue.is_emptyt.readersletcreate()={value=None;writers=Queue.create();readers=Queue.create()}letcreate_fullx={value=Somex;writers=Queue.create();readers=Queue.create()}letreadtk=matcht.valuewith|None->Queue.pusht.readers(K.createk)|Somev->(matchQueue.popt.writerswith|None->t.value<-None;kv|Some(v',w)->t.value<-Somev';EC.safe_run_kkv;K.runw())letwritetxk=matcht.valuewith|Some_->Queue.pusht.writers(x,K.createk)|None->(matchQueue.popt.readerswith|None->t.value<-Somex;k()|Somer->EC.safe_run_kk();K.runrx)endmoduleMutex=structtypet={mutablelocked:bool;mutablewaiters:unitK.tQueue.t}letlocktk=ift.lockedthenQueue.pusht.waiters(K.createk)else(t.locked<-true;k())letunlocktk=assertt.locked;matchQueue.popt.waiterswith|None->t.locked<-false;k()|Somenext->EC.safe_run_kk();K.runnext()letwith_locktf=let*()=locktinfinalizef~finally:(fun()->unlockt)letcreate()={locked=false;waiters=Queue.create()}endmoduleThrottle=structtypet={mutablesize:int;mutablerunning:int;waiting:unitIvar.tQueue.t}letcreatesize={size;running=0;waiting=Queue.create()}letsizet=t.sizeletrunningt=t.runningletrecrestartt=ift.running>=t.sizethenreturn()elsematchQueue.popt.waitingwith|None->return()|Someivar->t.running<-t.running+1;let*()=Ivar.fillivar()inrestarttletresizetn=t.size<-n;restarttletrunt~f=finalize~finally:(fun()->t.running<-t.running-1;restartt)(fun()->ift.running<t.sizethen(t.running<-t.running+1;f())elseletwaiting=Ivar.create()inQueue.pusht.waitingwaiting;let*()=Ivar.readwaitinginf())endtypefill=Fill:'aIvar.t*'a->fillletrunt~iter=EC.new_run(fun()->letresult=refNoneinEC.apply(fun()->t)()(funx->result:=Somex);letrecloop()=match!resultwith|Someres->res|None->let(Fill(ivar,v))=iter()inIvar.fillivarvignore;loop()inloop())letfork_and_racefafbk=letstate=refNothing_yetinEC.add_refs1;EC.applyfa()(funa->match!statewith|Nothing_yet->EC.deref();state:=Got_a();k(Lefta)|Got_a()->assertfalse|Got_b()->());fb()(funb->match!statewith|Nothing_yet->EC.deref();state:=Got_b();k(Rightb)|Got_a()->()|Got_b()->assertfalse)letrun2t=letresult=refNoneinEC.apply(fun()->t)()(funx->result:=Somex);!result