Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file time_source.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538open!Core_kernelopen!Importopen!Deferred_stdletdebug=Debug.clockmoduleAlarm=Timing_wheel.AlarmmoduleDeferred=Deferred1moduleScheduler=Scheduler1letupon=Deferred.uponletchoose=Deferred.chooseletchoice=Deferred.choicelet(>>>)=uponmoduleT1=structincludeSynchronous_time_source0.T1(* We don't include the [id] in the sexp because the user (rightly) can't control it, so
it's hard to make it deterministic in tests. *)letsexp_of_t_{id=_;advance_errors=_;am_advancing=_;events;fired_events=_;handle_fired=_;is_wall_clock;most_recently_fired=_;scheduler=_}=ifis_wall_clockthen[%message"<wall_clock>"]else[%message(is_wall_clock:bool)(* We don't display the [Job.t]s in [events] because those are
pool pointers, which are uninformative. *)(events:_Timing_wheel.t)];;endopenT1moduleRead_write=structtypet=read_writeT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsendtypet=readT1.t[@@derivingsexp_of]letinvariant=invariantletinvariant_with_jobs=invariant_with_jobsletread_only(t:[>read]T1.t)=(t:>t)letcreate=Scheduler.create_time_sourceletwall_clock=Scheduler.wall_clockletalarm_precisiont=Timing_wheel.alarm_precisiont.eventsletnext_alarm_fires_att=Timing_wheel.next_alarm_fires_att.eventslettiming_wheel_nowt=Timing_wheel.nowt.eventsletidt=t.idmoduleId=Synchronous_time_source0.Idletnowt=ift.is_wall_clockthen(* For the wall-clock time-source, we use [Time_ns.now ()] rather than
[Timing_wheel.now t.events]. The latter is only updated at the start of each
cycle. There can be substantial difference between the two when people do long
running computations or mix blocking code with async. And humans expect that
wall-clock time is based on [Time.now], not some artifact of async
implementation. *)Time_ns.now()elsetiming_wheel_nowt;;(* We preallocate [send_exn] to avoid allocating it on each call to [advance_clock]. *)letsend_exn=SomeMonitor.send_exnletadvance_directlyt~to_=Synchronous_time_source0.advance_clockt~to_~send_exnletadvance_directly_bytby=advance_directlyt~to_:(Time_ns.after(nowt)by)letadvance=advance_directlyletadvance_by=advance_directly_byletfire_past_alarmst=Synchronous_time_source0.fire_past_alarmst~send_exnletyieldt=Bvar.wait(Scheduler.yieldt.scheduler)letadvance_by_alarms?wait_fort~to_=letrun_queued_alarms()=(* Every time we want to run queued alarms we need to yield control back to the
[Async.Scheduler] and [wait_for] any logic that is supposed to finish at this time
before advancing. If no [wait_for] logic is specified we can simply yield control
by invoking [yield t], which enqueues another job at the end of the scheduler job
queue so alarm jobs have the opportunity to run before we advance. *)matchwait_forwith|None->yieldt|Somef->f()inletfinish()=advance_directlyt~to_;fire_past_alarmst;(* so that alarms scheduled at or before [to_] fire *)run_queued_alarms()inletrecwalk_alarms()=matchnext_alarm_fires_attwith|None->finish()|Somenext_alarm_fires_at->ifTime_ns.(>=)next_alarm_fires_atto_thenfinish()else(advance_directlyt~to_:next_alarm_fires_at;let%bind()=run_queued_alarms()inwalk_alarms())in(* This first [run_queued_alarms] call allows [Clock_ns.every] the opportunity to run
its continuation deferreds so that they can reschedule alarms. This is particularly
useful in our "advance hits intermediate alarms" unit test below, but likely useful
in other cases where [every] is synchronously followed by [advance]. *)let%bind()=run_queued_alarms()inwalk_alarms();;letadvance_by_alarms_by?wait_fortby=advance_by_alarms?wait_fort~to_:(Time_ns.after(nowt)by);;letspan_to_timetspan=Time_ns.after(nowt)spanletschedule_jobt~atexecution_contextfa=letalarm=Timing_wheel.addt.events~at(Job_or_event.of_job(Scheduler.create_jobt.schedulerexecution_contextfa))in(matcht.scheduler.event_added_hookwith|None->()|Somef->fat);alarm;;letrun_at_internalttimefa=letexecution_context=Scheduler.current_execution_contextt.schedulerinifTime_ns.(>)time(Timing_wheel.nowt.events)thenschedule_jobt~at:timeexecution_contextfaelse(Scheduler.enqueuet.schedulerexecution_contextfa;Alarm.null());;letrun_atttimefa=ignore(run_at_internalttimefa:_Alarm.t)letrun_aftertspanfa=run_att(span_to_timetspan)faletat=letfillresult=Ivar.fillresult()infunttime->ifTime_ns.(<=)time(Timing_wheel.nowt.events)thenreturn()else(letresult=Ivar.create()inignore(run_at_internalttimefillresult:_Alarm.t);Ivar.readresult);;letaftertspan=att(span_to_timetspan)letremove_alarmtalarm:unit=letjob_or_event=Alarm.valuet.eventsalarmin(letopenJob_or_event.Matchinlet(Kk)=kindjob_or_eventinmatchk,projectkjob_or_eventwith|Job,job->Scheduler.free_jobt.schedulerjob|Event,_->(* This is unreachable because [alarm] only ever comes from [Event.alarm] which only
ever gets populated by a call to [schedule_job]. *)assertfalse);Timing_wheel.removet.eventsalarm;;letremove_alarm_if_scheduledtalarm=ifTiming_wheel.memt.eventsalarmthenremove_alarmtalarm;;moduleEvent=structmoduleFired=structtype('a,'h)t=|Abortedof'a|Happenedof'h[@@derivingsexp_of]endtype('a,'h)t={mutablealarm:Job_or_event.tAlarm.t;mutablefire:unit->unit;(* As long as [Ivar.is_empty fired], we have not yet committed to whether the event
will happen or be aborted. When [Ivar.is_empty fired], the alarm may or may not
be in the timing wheel -- if it isn't, then there's a job in Async's job queue
that will fire the event, unless it is aborted before that job can run. *)fired:('a,'h)Fired.tIvar.t;(* [num_fires_to_skip] is used to reschedule events that have fired and entered the
Async job queue, but have not yet run. Those jobs only run if [num_fires_to_skip
= 0], and otherwise just decrement it. So, to reschedule an event in such a
state, we increment [num_fires_to_skip] and add a new alarm to the timing
wheel. *)mutablenum_fires_to_skip:int;(* [scheduled_at] is the time at which [t] has most recently been scheduled to fire.
While [t.alarm] is still in the timing wheel, this is the same as [Alarm.at
t.alarm]. *)mutablescheduled_at:Time_ns.t;time_source:Synchronous_time_source.t}[@@derivingfields,sexp_of]typet_unit=(unit,unit)t[@@derivingsexp_of]letfiredt=Ivar.readt.firedletinvariantinvariant_ainvariant_ht=Invariant.invariant[%here]t[%sexp_of:(_,_)t](fun()->letevents=t.time_source.eventsinletcheckf=Invariant.check_fieldtfinFields.iter~alarm:(check(funalarm->ifIvar.is_fullt.firedthenassert(not(Timing_wheel.memeventsalarm))elseifTiming_wheel.memeventsalarmthenassert(Job_or_event.is_job(Alarm.valueeventsalarm))))~fire:ignore~fired:(check(fun(fired:_Fired.tIvar.t)->matchDeferred.peek(Ivar.readfired)with|None->()|Some(Aborteda)->invariant_aa|Some(Happenedh)->invariant_hh))~num_fires_to_skip:(check(funnum_fires_to_skip->assert(num_fires_to_skip>=0)))~scheduled_at:(check(funscheduled_at->ifTiming_wheel.memeventst.alarmthen[%test_result:Time_ns.t]scheduled_at~expect:(Alarm.ateventst.alarm)))~time_source:ignore);;moduleStatus=structtype('a,'h)t=|Abortedof'a|Happenedof'h|Scheduled_atofTime_ns.t[@@derivingsexp_of]endletstatust:_Status.t=matchDeferred.peek(Ivar.readt.fired)with|None->Scheduled_att.scheduled_at|Some(Aborteda)->Aborteda|Some(Happenedh)->Happenedh;;moduleAbort_result=structtype('a,'h)t=|Ok|Previously_abortedof'a|Previously_happenedof'h[@@derivingsexp_of]endletabortta:_Abort_result.t=ifdebugthenDebug.log"Time_source.Event.abort"t[%sexp_of:(_,_)t];matchDeferred.peek(firedt)with|Some(Aborteda)->Previously_aborteda|Some(Happenedh)->Previously_happenedh|None->Ivar.fillt.fired(Aborteda);remove_alarm_if_scheduledt.time_sourcet.alarm;Ok;;letabort_exnta=matchaborttawith|Ok->()|Previously_happened_->raise_s[%message"Clock.Event.abort_exn failed to abort event that previously happened"]|Previously_aborted_->raise_s[%message"Clock.Event.abort_exn failed to abort event that previously aborted"];;letabort_if_possibleta=ignore(abortta:_Abort_result.t)letschedulet=t.alarm<-run_at_internalt.time_sourcet.scheduled_att.fire()moduleReschedule_result=structtype('a,'h)t=|Ok|Previously_abortedof'a|Previously_happenedof'h[@@derivingsexp_of]endletreschedule_attat:_Reschedule_result.t=ifdebugthenDebug.log"Time_source.Event.reschedule_at"(t,at)[%sexp_of:(_,_)t*Time_ns.t];matchDeferred.peek(firedt)with|Some(Aborteda)->Previously_aborteda|Some(Happenedh)->Previously_happenedh|None->letevents=t.time_source.eventsinletis_in_timing_wheel=Timing_wheel.memeventst.alarminletam_trying_to_reschedule_in_the_future=Time_ns.(>)at(Timing_wheel.nowevents)int.scheduled_at<-at;(matcham_trying_to_reschedule_in_the_future,is_in_timing_wheelwith|false,false->()|false,true->t.time_source.handle_firedt.alarm;Timing_wheel.removeeventst.alarm|true,false->t.num_fires_to_skip<-t.num_fires_to_skip+1;schedulet|true,true->Timing_wheel.rescheduleeventst.alarm~at);Ok;;letreschedule_aftertspan=reschedule_att(span_to_timet.time_sourcespan)letrun_attime_sourcescheduled_atfz=ifdebugthenDebug.log"Time_source.Event.run_at"scheduled_at[%sexp_of:Time_ns.t];lett={alarm=Alarm.null();fire=ignore(* set below *);fired=Ivar.create();num_fires_to_skip=0;scheduled_at;time_source=read_onlytime_source}inletfire()=(* [fire] runs in an Async job. The event may have been aborted after the job
was enqueued, so [fire] must check [fired]. *)ifIvar.is_emptyt.firedthenift.num_fires_to_skip>0thent.num_fires_to_skip<-t.num_fires_to_skip-1else(letresult=fzin(* [f z] may have aborted the event, so we must check [fired] again. *)ifIvar.is_emptyt.firedthenIvar.fillt.fired(Happenedresult))int.fire<-fire;schedulet;t;;letattime_sourcetime=run_attime_sourcetimeignore()letrun_aftertime_sourcespanfa=run_attime_source(span_to_timetime_sourcespan)fa;;letaftertime_sourcespan=attime_source(span_to_timetime_sourcespan)endletat_times?(stop=Deferred.never())tnext_time=lettail=Tail.create()inletrecloop()=choose[choicestop(fun()->`Stop);choice(att(next_time()))(fun()->`Tick)]>>>function|`Stop->Tail.close_exntail|`Tick->Tail.extendtail();loop()inloop();Tail.collecttail;;letat_varying_intervals?stoptcompute_span=at_timest?stop(fun()->Time_ns.after(nowt)(compute_span()));;letat_intervals?start?stoptinterval=letstart=matchstartwith|Somex->x|None->nowtinat_timest?stop(fun()->Time_ns.next_multiple~base:start~after:(nowt)~interval());;moduleContinue=structtypet=|Immediately|AfterofTime_ns.Span.t|Next_multipleofTime_ns.t*Time_ns.Span.tletimmediately=Immediatelyletatttime_source=matchtwith|Immediately->Timing_wheel.nowtime_source.events|Afterspan->span_to_timetime_sourcespan|Next_multiple(base,interval)->Time_ns.next_multiple~base~after:(nowtime_source)~interval();;endletrun_repeatedly?(start=return())?stop?(continue_on_error=true)?(finished=Ivar.create())t~f~continue=start>>>fun()->letalarm=ref(Alarm.null())inletstop=matchstopwith|None->Deferred.never()|Somestop->uponstop(fun()->ifTiming_wheel.memt.events!alarmthen(remove_alarmt!alarm;Ivar.fill_if_emptyfinished()));stopin(* [run_f], [continue_f], and [continue_try_with] are defined so that we allocate their
closures once, not once per iteration. *)letrecrun_f()=(* Before calling [f], we synchronously check whether [stop] is determined. *)ifDeferred.is_determinedstopthenIvar.fill_if_emptyfinished()elseifcontinue_on_errorthenMonitor.try_withf~run:`Now~rest:`Raise>>>continue_try_withelse(letd=f()inifDeferred.is_determineddthencontinue_f()elsed>>>continue_f)andcontinue_f()=ifDeferred.is_determinedstopthenIvar.fill_if_emptyfinished()elsealarm:=run_at_internalt(Continue.atcontinuet)run_f()andcontinue_try_withor_error=(matchor_errorwith|Ok()->()|Errorerror->Monitor.send_exn(Monitor.current())error);continue_f()inrun_f();;letevery'?start?stop?continue_on_error?finishedtspanf=ifTime_ns.Span.(<=)spanTime_ns.Span.zerothenraise_s[%message"Time_source.every got nonpositive span"(span:Time_ns.Span.t)];run_repeatedlyt?start?stop?continue_on_error?finished~f~continue:(Afterspan);;letevery?start?stop?continue_on_errortspanf=every't?start?stop?continue_on_error?finished:Nonespan(fun()->f();return());;letrun_at_intervals'?start?stop?continue_on_errortintervalf=letnow=nowtinletbase,start=matchstartwith|None->now,None|Somestart->(start,Some(att(Time_ns.next_multiple()~base:start~after:now~can_equal_after:true~interval)))inrun_repeatedlyt?start?stop?continue_on_error~f~continue:(Next_multiple(base,interval));;letrun_at_intervals?start?stop?continue_on_errortintervalf=run_at_intervals'?start?stop?continue_on_errortinterval(fun()->f();return());;letwith_timeouttspand=lettimeout=Event.aftertspaninchoose(* The code below does exhaustive case analysis in both [choice]s. Because [timeout]
does not escape the scope of this function, certain cases should be impossible, and
are marked as such with exceptions. We do not expect those exceptions to occur,
but if they do, it likely indicates a bug in [choose] rather than
[with_timeout]. *)[choiced(funv->(matchEvent.aborttimeout()with(* [Previously_happened] can occur if both [d] and [wait] become determined at
the same time, e.g. [with_timeout (sec 0.) (return ())]. *)|Ok|Previously_happened()->()|Previously_aborted()->raise_s[%message"Time_source.with_timeout bug: should only abort once"]);`Resultv);choice(Event.firedtimeout)(function|Happened()->`Timeout|Aborted()->raise_s[%message"Time_source.with_timeout bug: both completed and timed out"])];;letof_synchronoust=tletto_synchronoust=t