Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file limiter_async.ml
openCore_kernelopenAsync_kernelopenLimiter.Infinite_or_finitemoduleOutcome=structtype'at=|Okof'a|Aborted|Raisedofexn[@@derivingsexp_of]endmoduleJob=structtypet=|Immediate:Monitor.t*('a->unit)*'a->t|Deferred:('a->'bDeferred.t)*'a*'bOutcome.tIvar.t->tendmoduleExpert=structtypet={continue_on_error:bool(* [is_dead] is true if [t] was killed due to a job raising an exception or [kill t]
being called. *);mutableis_dead:bool(* Ivar that is filled the next time return_to_hopper is called. *);mutablehopper_filled:unitIvar.toption;limiter:Limiter.t;throttle_queue:((int*Job.t)Queue.t[@sexp.opaque])}[@@derivingsexp_of]letto_jane_limitert=t.limiterletcycle_start()=Async_kernel_scheduler.cycle_start_ns()letcreate_exn~hopper_to_bucket_rate_per_sec~bucket_limit~in_flight_limit~initial_bucket_level~initial_hopper_level~continue_on_error=letlimiter=Limiter.Expert.create_exn~now:(cycle_start())~hopper_to_bucket_rate_per_sec~bucket_limit~in_flight_limit~initial_bucket_level~initial_hopper_levelinletthrottle_queue=Queue.create()in{continue_on_error;is_dead=false;hopper_filled=None;limiter;throttle_queue};;letis_deadt=t.is_deadletkill_job=function|Job.Deferred(_,_,i)->Ivar.fill_if_emptyiAborted|Job.Immediate(monitor,_,_)->Monitor.send_exnmonitor~backtrace:`Get(Failure"Limiter killed");;letkillt=ifnott.is_deadthenbegint.is_dead<-true;Queue.itert.throttle_queue~f:(fun(_,job)->kill_jobjob)end;;;letsaw_errort=ifnott.continue_on_errorthen(killt)letwait_for_hopper_fillt=matcht.hopper_filledwith|Somei->Ivar.readi|None->leti=Ivar.create()int.hopper_filled<-Somei;Ivar.readi;;letreturn_to_hoppert~nowamount=beginmatcht.hopper_filledwith|None->()|Somei->Ivar.filli();t.hopper_filled<-None;end;Limiter.Expert.return_to_hoppert.limiter~nowamount;;letrun_job_nowtjob~return_after:unit=ift.is_deadthen(kill_jobjob)elsebeginmatchjobwith|Job.Immediate(monitor,f,v)->begintryfvwith|e->Monitor.send_exnmonitor~backtrace:`Geteend;return_to_hoppert~now:(cycle_start())return_after|Job.Deferred(f,v,i)->Monitor.try_with(fun()->fv)>>>funres->return_to_hoppert~now:(cycle_start())return_after;matchreswith|Errore->Ivar.fill_if_emptyi(Raisede);saw_errort|Okv->Ivar.fill_if_emptyi(Okv)end;;(* given a job, immediately creates and runs a job that fails with the given (as a
format string) message *)letfail_jobtjobk=ksprintf(funs->letf()=failwithsinletjob=matchjobwith|Job.Immediate(monitor,_,_)->Job.Immediate(monitor,f,())|Job.Deferred(_,_,i)->Job.Deferred(f,(),i)inrun_job_nowtjob~return_after:0)k;;letrecrun_throttled_jobs_until_emptyt=ifQueue.lengtht.throttle_queue=0then()elsebeginletamount,job=Queue.peek_exnt.throttle_queueinletnow=cycle_start()inmatchLimiter.Expert.try_taket.limiter~nowamountwith|Asked_for_more_than_bucket_limit->fail_jobtjob!"job asked for more tokens (%i) than possible (%i)"amount(Limiter.bucket_limitt.limiter);run_throttled_jobs_until_emptyt|Taken->(* Safe, because we checked the length above. And, we're guaranteed that
dequeue_exn gets out the same job that peek_exn does. *)ignore(Queue.dequeue_exnt.throttle_queue:(int*Job.t));run_job_nowtjob~return_after:amount;run_throttled_jobs_until_emptyt|Unable->beginmatchLimiter.Expert.tokens_may_be_available_whent.limiter~nowamountwith|Never_because_greater_than_bucket_limit->fail_jobtjob!"job asked for more tokens (%i) than possible (%i)"amount(Limiter.bucket_limitt.limiter);run_throttled_jobs_until_emptyt|When_return_to_hopper_is_called->wait_for_hopper_fillt>>>fun()->run_throttled_jobs_until_emptyt|Atexpected_fill_time->letmin_fill_time=Time_ns.add(cycle_start())(Async_kernel_scheduler.event_precision_ns())inClock_ns.at(Time_ns.maxexpected_fill_timemin_fill_time)>>>fun()->run_throttled_jobs_until_emptytendend;;letenqueue_job_and_maybe_start_queue_runnertamountjob~allow_immediate_run=letbucket_limit=Limiter.bucket_limitt.limiterinifbucket_limit<amountthen(fail_jobtjob!"requested job size (%i) exceeds the possible size (%i)"amountbucket_limit);ift.is_deadthen(kill_jobjob)elsebeginifQueue.lengtht.throttle_queue>0then(Queue.enqueuet.throttle_queue(amount,job))elsebeginletnow=cycle_start()inmatchLimiter.Expert.try_taket.limiter~nowamountwith|Asked_for_more_than_bucket_limit->fail_jobtjob!"requested job size (%i) exceeds the possible size (%i)"amountbucket_limit;|Taken->(* These semantics are copied from the current Throttle, and it was
important enough there to add a specific unit test. If you have
do_f ();
enqueue thing_to_do_later;
do_g ();
it is surprising if any portion of the closure thing_to_do_later happens, so
we always schedule the work for later on the Async queue.
This isn't as efficient as it could be for immediate jobs and can be avoided
with [run_or_enqueue].
*)ifallow_immediate_runthen(run_job_nowtjob~return_after:amount)else(Async_kernel_scheduler.enqueue_jobExecution_context.main(funt->run_job_nowtjob~return_after:amount)t)|Unable->Queue.enqueuet.throttle_queue(amount,job);run_throttled_jobs_until_emptytendend;;letenqueue_exnt?(allow_immediate_run=false)amountfv=enqueue_job_and_maybe_start_queue_runnertamount~allow_immediate_run(Immediate(Monitor.current(),f,v));;letenqueue'tamountfv=Deferred.create(funi->tryenqueue_job_and_maybe_start_queue_runnertamount(Deferred(f,v,i))~allow_immediate_run:falsewithe->Ivar.filli(Raisede));;letcost_of_jobs_waiting_to_startt=Queue.foldt.throttle_queue~init:0~f:(funsum(cost,_)->cost+sum);;endopenExperttypet=Expert.t[@@derivingsexp_of]typelimiter=t[@@derivingsexp_of]moduleCommon=structletto_limiter(t:t)=tletkill=Expert.killletis_dead=Expert.is_deadendmoduletypeCommon=sigtype_t(** kills [t], which aborts all enqueued jobs that haven't started and all jobs enqueued
in the future. If [t] has already been killed, then calling [kill t] has no effect.
Note that kill does not effect currently running jobs in any way. *)valkill:_t->unit(** [is_dead t] returns [true] if [t] was killed, either by [kill] or by an unhandled
exception in a job. *)valis_dead:_t->boolvalto_limiter:_t->limiterendmoduleToken_bucket=structtypet=limiter[@@derivingsexp_of]type_u=tletcreate_exn~burst_size:bucket_limit~sustained_rate_per_sec:fill_rate~continue_on_error?in_flight_limit?(initial_burst_size=0)()=letin_flight_limit=matchin_flight_limitwith|None->Infinite|Somelimit->FinitelimitinExpert.create_exn~bucket_limit~in_flight_limit~hopper_to_bucket_rate_per_sec:(Finitefill_rate)~initial_bucket_level:initial_burst_size~initial_hopper_level:Infinite~continue_on_error;;letenqueue_exn=Expert.enqueue_exnletenqueue'=Expert.enqueue'includeCommonendmoduleThrottle=structtypet=limiter[@@derivingsexp_of]type_u=tletcreate_exn~concurrent_jobs_target~continue_on_error?burst_size?sustained_rate_per_sec()=ifconcurrent_jobs_target<1then(failwithf!"concurrent_jobs_target < 1 (%i) doesn't make sense"concurrent_jobs_target());letconcurrent_jobs_target=concurrent_jobs_targetinlethopper_to_bucket_rate_per_sec=matchsustained_rate_per_secwith|None->Infinite|Somerate->Finiterateinletbucket_limit=matchburst_sizewith|None->concurrent_jobs_target|Someburst_size->burst_sizeinletinitial_bucket_level=bucket_limitinExpert.create_exn~bucket_limit~in_flight_limit:(Finiteconcurrent_jobs_target)~hopper_to_bucket_rate_per_sec~initial_bucket_level~initial_hopper_level:(Finite0)~continue_on_error;;letenqueue_exnt?allow_immediate_runfv=Expert.enqueue_exnt?allow_immediate_run1fv;;letenqueue'tfv=Expert.enqueue't1fvletjlimiter=Expert.to_jane_limiterletconcurrent_jobs_targett=jlimitert|>Limiter.bucket_limit;;letnum_jobs_waiting_to_startt=Queue.lengtht.throttle_queueletnum_jobs_runningt=Limiter.in_flight(jlimitert)~now:(Async_kernel_scheduler.cycle_start_ns());;includeCommonendmoduleSequencer=structincludeThrottleletcreate?(continue_on_error=false)?burst_size?sustained_rate_per_sec()=create_exn~concurrent_jobs_target:1~continue_on_error?burst_size?sustained_rate_per_sec();;includeCommonendmoduleResource_throttle=structtype'at={throttle:Throttle.t;resources:'aQueue.t}[@@derivingsexp_of]letcreate_exn~resources~continue_on_error?burst_size?sustained_rate_per_sec()=letresources=Queue.of_listresourcesinletmax_concurrent_jobs=Queue.lengthresourcesinletthrottle=Throttle.create_exn~concurrent_jobs_target:max_concurrent_jobs~continue_on_error?burst_size?sustained_rate_per_sec()in{throttle;resources};;letenqueue_gent?allow_immediate_runfenqueue=letf()=letv=Queue.dequeue_exnt.resourcesinprotect~f:(fun()->fv)~finally:(fun()->Queue.enqueuet.resourcesv)inenqueuet.throttle?allow_immediate_runf();;letenqueue_exnt?allow_immediate_runf=enqueue_gent?allow_immediate_runfThrottle.enqueue_exn;;letenqueue'tf=letf()=letv=Queue.dequeue_exnt.resourcesinMonitor.protect(fun()->fv)~finally:(fun()->Queue.enqueuet.resourcesv;Deferred.unit)inThrottle.enqueue't.throttlef();;letmax_concurrent_jobst=Throttle.concurrent_jobs_targett.throttleletto_limitert=t.throttleletkillt=killt.throttleletis_deadt=is_deadt.throttleend