Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file scheduler.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457openImportletwith_mutexm~f=Mutex.lockm;letres=f()inMutex.unlockm;resmoduleMvar:sigtype'atvalcreate:unit->'atvalget:'at->'avalset:'at->'a->unitend=structtype'at={m:Mutex.t;cv:Condition.t;mutablecell:'aoption}letcreate()={m=Mutex.create();cv=Condition.create();cell=None}letgett=letrecawait_valuet=matcht.cellwith|None->Condition.waitt.cvt.m;await_valuet|Somev->t.cell<-None;vinwith_mutext.m~f:(fun()->await_valuet)letsettv=with_mutext.m~f:(fun()->t.cell<-Somev);Condition.signalt.cvendmoduleWorker:sig(** Simple queue that is consumed by its own thread *)type'worktvalcreate:do_:('a->unit)->'attypetaskvalcancel:task->unitvaladd_work:'at->'a->(task,[`Stopped])resultvalstop:_t->unitend=structtypestate=|RunningofThread.t|StoppedofThread.t|Finishedtype'at={work:'aRemovable_queue.t;mutablestate:state;mutex:Mutex.t;work_available:Condition.t}andtask=Task:'at*'aRemovable_queue.node->taskletcancel(Task(t,node))=with_mutext.mutex~f:(fun()->Removable_queue.removenode)letis_runningt=matcht.statewith|Running_->true|Stopped_|Finished->falseletrun(f,t)=letrecloop()=matcht.statewith|Stopped_->(matchRemovable_queue.popt.workwith|None->t.state<-Finished|Somejob->do_workjob)|Finished->()|Running_->(matchRemovable_queue.popt.workwith|Somejob->do_workjob|None->whileRemovable_queue.is_emptyt.work&&is_runningtdoCondition.waitt.work_availablet.mutexdone;loop())anddo_workjob=Mutex.unlockt.mutex;fjob;Mutex.lockt.mutex;loop()inwith_mutext.mutex~f:loopletcreate~do_=lett={work=Removable_queue.create();state=Finished;mutex=Mutex.create();work_available=Condition.create()}int.state<-Running(Thread.createrun(do_,t));tletadd_work(typea)(t:at)(w:a)=with_mutext.mutex~f:(fun()->ifis_runningtthen(letnode=Removable_queue.pusht.workwinCondition.signalt.work_available;Ok(Task(t,node)))elseError`Stopped)letstop(t:_t)=with_mutext.mutex~f:(fun()->matcht.statewith|Runningth->t.state<-Stoppedth;Condition.signalt.work_available|Stopped_|Finished->())endmoduleTimer_id=Id.Make()typet={mutableevents_pending:int;events:eventQueue.t;events_mutex:Mutex.t;time_mutex:Mutex.t;event_ready:Condition.t;timers_available:Condition.t;timers_available_mutex:Mutex.t;mutablethreads:threadlist;earliest_wakeup:floatMvar.t;mutabletime:Thread.t;mutablewaker:Thread.t;(* TODO Replace with Removable_queue *)timers:(Timer_id.t,active_timerref)Table.t}andevent=|Job_completed:'a*'aFiber.Ivar.t->event|Scheduledofactive_timer|Abortandjob=|Pending:(unit->'a)*('a,[`ExnofExn.t|`Canceled])resultFiber.Ivar.t->jobandthread={scheduler:t;worker:jobWorker.t}andtimer={mutabledelay:float;timer_scheduler:t;timer_id:Timer_id.t}andactive_timer={scheduled:float;ivar:[`Resolved|`Cancelled]Fiber.Ivar.t;parent:timer}letadd_eventst=function|[]->()|events->with_mutext.events_mutex~f:(fun()->List.iterevents~f:(Queue.pusht.events);Condition.signalt.event_ready)letis_emptytable=Table.lengthtable=0letme=Fiber.Var.create()letscheduler()=Fiber.Var.get_exnmeletsignal_timers_availablet=with_mutext.timers_available_mutex~f:(fun()->Condition.signalt.timers_available)lettime_loopt=letrecloop()=letto_run=ref[]inletearliest_next=refNoneinwith_mutext.time_mutex~f:(fun()->ifis_emptyt.timersthen()elseletnow=Unix.gettimeofday()inTable.filteri_inplacet.timers~f:(fun~key:_~data:active_timer->letactive_timer=!active_timerinletscheduled_at=active_timer.scheduled+.active_timer.parent.delayinletneed_to_run=scheduled_at<nowinifneed_to_runthento_run:=Scheduledactive_timer::!to_runelseearliest_next:=Some(match!earliest_nextwith|None->scheduled_at|Somev->minscheduled_atv);notneed_to_run));add_eventst!to_run;Option.iter!earliest_next~f:(Mvar.sett.earliest_wakeup);with_mutext.timers_available_mutex~f:(fun()->Condition.waitt.timers_availablet.timers_available_mutex);loop()inloop()letwake_loopt=letrecloop()=letwakeup_at=Mvar.gett.earliest_wakeupinletnow=Unix.gettimeofday()inifnow<wakeup_atthenThread.delay(wakeup_at-.now);signal_timers_availablet;loop()inloop()letcreate()=lett={events_pending=0;events=Queue.create();events_mutex=Mutex.create();time_mutex=Mutex.create();event_ready=Condition.create();threads=[];timers=Table.create(moduleTimer_id)10;timers_available=Condition.create();timers_available_mutex=Mutex.create();time=Thread.self();earliest_wakeup=Mvar.create();waker=Thread.self()}int.time<-Thread.createtime_loopt;t.waker<-Thread.createwake_loopt;tletcreate_threadscheduler=letworker=letdo_(Pending(f,ivar))=letres=matchResult.try_withfwith|Okx->Okx|Errorexn->Error(`Exnexn)inadd_eventsscheduler[Job_completed(res,ivar)]inWorker.create~do_inlett={scheduler;worker}inscheduler.threads<-t::scheduler.threads;tletadd_pending_eventstby=with_mutext.events_mutex~f:(fun()->t.events_pending<-t.events_pending+by;assert(t.events_pending>=0))type'atask={ivar:('a,[`ExnofExn.t|`Canceled])resultFiber.Ivar.t;task:Worker.task}letawaittask=Fiber.Ivar.readtask.ivarletawait_no_canceltask=letopenFiber.Oinlet+res=Fiber.Ivar.readtask.ivarinmatchreswith|Okx->Okx|Error`Canceled->assertfalse|Error(`Exnexn)->Errorexnletcancel_tasktask=letopenFiber.Oinlet*status=Fiber.Ivar.peektask.ivarinmatchstatuswith|Some_->Fiber.return()|None->Worker.canceltask.task;Fiber.Ivar.filltask.ivar(Error`Canceled)letasync(t:thread)f=add_pending_eventst.scheduler1;letivar=Fiber.Ivar.create()inletwork=Worker.add_workt.worker(Pending(f,ivar))inResult.mapwork~f:(funtask->{ivar;task})letasync_exntf=matchasynctfwith|Error`Stopped->Code_error.raise"async_exn: stopped thread"[]|Oktask->taskletstop(t:thread)=Worker.stopt.workerletcancel_timerst=lettimers=ref[]inwith_mutext.time_mutex~f:(fun()->Table.filteri_inplacet.timers~f:(fun~key:_~data:timer->timers:=!timer.ivar::!timers;false));Fiber.parallel_iter!timers~f:(funivar->Fiber.Ivar.fillivar`Cancelled)letcleanupt=List.itert.threads~f:stoptyperun_error=|Never|Abort_requested|ExnofExn.texceptionAbortofrun_errorletevent_next(t:t):Fiber.fill=with_mutext.events_mutex~f:(fun()->whileQueue.is_emptyt.eventsdoCondition.waitt.event_readyt.events_mutexdone;letconsume_event()=letres=Queue.pop_exnt.eventsint.events_pending<-t.events_pending-1;assert(t.events_pending>=0);resinifQueue.is_emptyt.eventsthenError(AbortNever)elsematchconsume_event()with|Abort->Error(AbortAbort_requested)|Job_completed(a,ivar)->Ok(Fiber.Fill(ivar,a))|Scheduledactive_timer->Ok(Fill(active_timer.ivar,`Resolved)))|>Result.ok_exnletreportt=letstatusm=ifMutex.try_lockmthen"was unlocked"else"locked"in[("time_mutex",t.time_mutex);("timers_available_mutex",t.timers_available_mutex);("events_mutex",t.events_mutex)]|>List.iter~f:(fun(name,mutex)->Format.eprintf"%s: %s@."name(statusmutex));Format.eprintf"pending events: %d@."t.events_pending;Format.eprintf"events: %d@."(Queue.lengtht.events);Format.eprintf"threads: %d@."(List.lengtht.threads);Format.eprintf"timers: %d@."(Table.lengtht.timers)letiter(t:t)=ift.events_pending=0then(let()=assert(Queue.is_emptyt.events)inreportt;raise(AbortNever))elseevent_nexttletrun_result:'a.t->'aFiber.t->('a,_)result=funtf->letf=Fiber.Var.setmet(fun()->f)inletiter()=itertinletres=matchFiber.runf~iterwith|exceptionAborterr->Errorerr|exceptionexn->Error(Exnexn)|res->assert(t.events_pending=0);Okresincleanupt;resletruntf=matchrun_resulttfwith|Oks->s|Errore->raise(Aborte)letcreate_timert~delay={timer_scheduler=t;delay;timer_id=Timer_id.gen()}letset_delayt~delay=t.delay<-delayletschedule(typea)(timer:timer)(f:unit->aFiber.t):(a,[`Cancelled])resultFiber.t=letopenFiber.Oinletactive_timer=letscheduled=Unix.gettimeofday()in{scheduled;ivar=Fiber.Ivar.create();parent=timer}inlet*()=matchwith_mutextimer.timer_scheduler.time_mutex~f:(fun()->matchTable.findtimer.timer_scheduler.timerstimer.timer_idwith|Someactive->letto_cancel=!active.ivarinactive:=active_timer;`Cancelto_cancel|None->Table.add_exntimer.timer_scheduler.timerstimer.timer_id(refactive_timer);`Signal_timers_available)with|`Cancelivar->Fiber.Ivar.fillivar`Cancelled|`Signal_timers_available->add_pending_eventstimer.timer_scheduler1;signal_timers_availabletimer.timer_scheduler;Fiber.return()inlet*res=Fiber.Ivar.readactive_timer.ivarinmatchreswith|`Cancelledase->Fiber.return(Errore)|`Resolved->let+res=f()inOkresletcancel_timer(timer:timer)=lett=timer.timer_schedulerinmatchwith_mutext.time_mutex~f:(fun()->matchTable.findt.timerstimer.timer_idwith|None->None|Someat->Table.removet.timerstimer.timer_id;Some!at.ivar)with|None->Fiber.return()|Someivar->with_mutext.events_mutex~f:(fun()->t.events_pending<-t.events_pending-1);Fiber.Ivar.fillivar`Cancelledletabortt=(* TODO proper cleanup *)add_eventst[Abort]