Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file raw_scheduler.ml
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087openCoreopenImportmoduleFd=Raw_fdmoduleWatching=Fd.WatchingmoduleSignal=Core.SignalmoduleTimerfd=Linux_ext.TimerfdmoduleTsc=Time_stamp_counterletdebug=Debug.schedulermoduleFile_descr_watcher=struct(* A file descriptor watcher implementation + a watcher. We need the file-descr watcher
as a first-class value to support choosing which file-descr watcher to use in
[go_main]. We could define [t] as [Epoll of ... | Select of ...] and dispatch every
call, but it is simpler to just pack the file descriptor watcher with its associated
functions (OO-programming with modules...). *)moduletypeS=sigincludeFile_descr_watcher_intf.Svalwatcher:tendtypet=(moduleS)letsexp_of_tt=letmoduleF=(valt:S)in(* Include the backend information so we know which one it is. *)[%sexp_of:Config.File_descr_watcher.t*F.t](F.backend,F.watcher);;endincludeAsync_kernel_schedulertypet={(* The scheduler [mutex] must be locked by all code that is manipulating scheduler
data structures, which is almost all async code. The [mutex] is automatically
locked in the main thread when the scheduler is first created. A [Nano_mutex]
keeps track of which thread is holding the lock. This means we can detect errors
in which code incorrectly accesses async from a thread not holding the lock. We do
this when [detect_invalid_access_from_thread = true]. We also detect errors in
which code tries to acquire the async lock while it already holds it, or releases
the lock when it doesn't hold it. *)mutex:Nano_mutex.t;mutableis_running:bool;mutablehave_called_go:bool;(* [fds_whose_watching_has_changed] holds all fds whose watching has changed since
the last time their desired state was set in the [file_descr_watcher]. *)fds_whose_watching_has_changed:Fd.tStack.t;file_descr_watcher:File_descr_watcher.t;mutabletime_spent_waiting_for_io:Tsc.Span.t;(* [fd_by_descr] holds every file descriptor that Async knows about. Fds are added
when they are created, and removed when they transition to [Closed]. *)fd_by_descr:Fd_by_descr.t;(* If we are using a file descriptor watcher that does not support sub-millisecond
timeout, [timerfd] contains a timerfd used to handle the next expiration.
[timerfd_set_at] holds the the time at which [timerfd] is set to expire. This
lets us avoid calling [Time_ns.now] and [Linux_ext.Timerfd.set_after] unless
we need to change that time. *)mutabletimerfd:Linux_ext.Timerfd.toption;mutabletimerfd_set_at:Time_ns.t;(* A distinguished thread, called the "scheduler" thread, is continually looping,
checking file descriptors for I/O and then running a cycle. It manages
the [file_descr_watcher] and runs signal handlers.
[scheduler_thread_id] is mutable because we create the scheduler before starting
the scheduler running. Once we start running the scheduler, [scheduler_thread_id]
is set and never changes again. *)mutablescheduler_thread_id:int;(* The [interruptor] is used to wake up the scheduler when it is blocked on the file
descriptor watcher. *)interruptor:Interruptor.t;signal_manager:Raw_signal_manager.t;(* The [thread_pool] is used for making blocking system calls in threads other than
the scheduler thread, and for servicing [In_thread.run] requests. *)thread_pool:Thread_pool.t;(* [handle_thread_pool_stuck] is called once per second if the thread pool is"stuck",
i.e has not completed a job for one second and has no available threads. *)mutablehandle_thread_pool_stuck:Thread_pool.t->stuck_for:Time_ns.Span.t->unit;busy_pollers:Busy_pollers.t;mutablebusy_poll_thread_is_running:bool;mutablenext_tsc_calibration:Tsc.t;kernel_scheduler:Kernel_scheduler.t;(* [have_lock_do_cycle] is used to customize the implementation of running a cycle.
E.g. in Ecaml it is set to something that causes Emacs to run a cycle. *)mutablehave_lock_do_cycle:(unit->unit)option(* configuration*);mutablemax_inter_cycle_timeout:Max_inter_cycle_timeout.t;mutablemin_inter_cycle_timeout:Min_inter_cycle_timeout.t;(* [initialized_at] is the call stack from when the scheduler was initialized. It's
generally more confusing than useful if it's shown on crash, so we omit it from the
sexp. *)initialized_at:(Backtrace.t[@sexp.opaque])}[@@derivingfields,sexp_of]letmax_num_threadst=Thread_pool.max_num_threadst.thread_poolletmax_num_open_file_descrst=Fd_by_descr.capacityt.fd_by_descrletcurrent_execution_contextt=Kernel_scheduler.current_execution_contextt.kernel_scheduler;;letwith_execution_contexttcontext~f=Kernel_scheduler.with_execution_contextt.kernel_schedulercontext~f;;letcreate_fd?avoid_nonblock_if_possibletkindfile_descrinfo=letfd=Fd.create?avoid_nonblock_if_possiblekindfile_descrinfoinmatchFd_by_descr.addt.fd_by_descrfdwith|Ok()->fd|Errorerror->letbacktrace=ifam_running_inline_testthenNoneelseSome(Backtrace.get())inraise_s[%message"Async was unable to add a file descriptor to its table of open file descriptors"(file_descr:File_descr.t)(error:Error.t)(backtrace:(Backtrace.toption[@sexp.option]))~scheduler:(ifam_running_inline_testthenNoneelseSomet:(toption[@sexp.option]))];;letthread_pool_cpu_affinityt=Thread_pool.cpu_affinityt.thread_poolletlockt=(* The following debug message is outside the lock, and so there can be races between
multiple threads printing this message. *)ifdebugthenDebug.log_string"waiting on lock";Nano_mutex.lock_exnt.mutex;;lettry_lockt=matchNano_mutex.try_lock_exnt.mutexwith|`Acquired->true|`Not_acquired->false;;letunlockt=ifdebugthenDebug.log_string"lock released";Nano_mutex.unlock_exnt.mutex;;letwith_locktf=lockt;protect~f~finally:(fun()->unlockt);;letam_holding_lockt=Nano_mutex.current_thread_has_lockt.mutextypethe_one_and_only=|Not_ready_to_initialize|Ready_to_initializeof(unit->t)|Initializedoft(* We use a mutex to protect creation of the one-and-only scheduler in the event that
multiple threads attempt to call [the_one_and_only] simultaneously, which can
happen in programs that are using [Thread_safe.run_in_async]. *)letmutex_for_initializing_the_one_and_only_ref=Nano_mutex.create()letthe_one_and_only_ref:the_one_and_onlyref=refNot_ready_to_initializeletis_ready_to_initialize()=match!the_one_and_only_refwith|Not_ready_to_initialize|Initialized_->false|Ready_to_initialize_->true;;(* Handling the uncommon cases in this function allows [the_one_and_only] to be inlined.
The presence of a string constant keeps this function from being inlined. *)letthe_one_and_only_uncommon_case~should_lock=Nano_mutex.critical_sectionmutex_for_initializing_the_one_and_only_ref~f:(fun()->match!the_one_and_only_refwith|Initializedt->t|Not_ready_to_initialize->raise_s[%message"Async the_one_and_only not ready to initialize"]|Ready_to_initializef->lett=f()in(* We supply [~should_lock:true] to lock the scheduler when the user does async
stuff at the top level before calling [Scheduler.go], because we don't want
anyone to be able to run jobs until [Scheduler.go] is called. This could happen,
e.g. by creating a reader that does a read system call in another (true) thread.
The scheduler will remain locked until the scheduler unlocks it. *)ifshould_lockthenlockt;the_one_and_only_ref:=Initializedt;t);;letthe_one_and_only~should_lock=match!the_one_and_only_refwith|Initializedt->t|Not_ready_to_initialize|Ready_to_initialize_->the_one_and_only_uncommon_case~should_lock;;letcurrent_thread_id()=Core.Thread.(id(self()))letis_main_thread()=current_thread_id()=0letremove_fdtfd=Fd_by_descr.removet.fd_by_descrfdletmaybe_start_closing_fdt(fd:Fd.t)=iffd.num_active_syscalls=0then(matchfd.statewith|Closed|Open_->()|Close_requested(execution_context,do_close_syscall)->(* We must remove the fd now and not after the close has finished. If we waited
until after the close had finished, then the fd might have already been
reused by the OS and replaced. *)remove_fdtfd;Fd.set_statefdClosed;Kernel_scheduler.enqueuet.kernel_schedulerexecution_contextdo_close_syscall());;letdec_num_active_syscalls_fdt(fd:Fd.t)=fd.num_active_syscalls<-fd.num_active_syscalls-1;maybe_start_closing_fdtfd;;letinvariantt:unit=tryletcheckinvariantfield=invariant(Field.getfieldt)inFields.iter~mutex:ignore~have_lock_do_cycle:ignore~is_running:ignore~have_called_go:ignore~fds_whose_watching_has_changed:(check(funfds_whose_watching_has_changed->Stack.iterfds_whose_watching_has_changed~f:(fun(fd:Fd.t)->assertfd.watching_has_changed;matchFd_by_descr.findt.fd_by_descrfd.file_descrwith|None->assertfalse|Somefd'->assert(phys_equalfdfd'))))~file_descr_watcher:(check(funfile_descr_watcher->letmoduleF=(valfile_descr_watcher:File_descr_watcher.S)inF.invariantF.watcher;F.iterF.watcher~f:(funfile_descr_->trymatchFd_by_descr.findt.fd_by_descrfile_descrwith|None->raise_s[%message"missing from fd_by_descr"]|Somefd->assert(Fd.num_active_syscallsfd>0)with|exn->raise_s[%message"fd problem"(exn:exn)(file_descr:File_descr.t)])))~time_spent_waiting_for_io:ignore~fd_by_descr:(check(funfd_by_descr->Fd_by_descr.invariantfd_by_descr;Fd_by_descr.iterfd_by_descr~f:(funfd->iffd.watching_has_changedthenassert(Stack.existst.fds_whose_watching_has_changed~f:(funfd'->phys_equalfdfd')))))~timerfd:ignore~timerfd_set_at:ignore~scheduler_thread_id:ignore~interruptor:(checkInterruptor.invariant)~signal_manager:(checkRaw_signal_manager.invariant)~thread_pool:(checkThread_pool.invariant)~handle_thread_pool_stuck:ignore~busy_pollers:(checkBusy_pollers.invariant)~busy_poll_thread_is_running:ignore~next_tsc_calibration:ignore~kernel_scheduler:(checkKernel_scheduler.invariant)~max_inter_cycle_timeout:ignore~min_inter_cycle_timeout:(check(funmin_inter_cycle_timeout->assert(Time_ns.Span.(<=)(Min_inter_cycle_timeout.rawmin_inter_cycle_timeout)(Max_inter_cycle_timeout.rawt.max_inter_cycle_timeout))))~initialized_at:ignorewith|exn->raise_s[%message"Scheduler.invariant failed"(exn:exn)~scheduler:(t:t)];;letupdate_check_accesstdo_check=Kernel_scheduler.set_check_accesst.kernel_scheduler(ifnotdo_checkthenNoneelseSome(fun()->ifnot(am_holding_lockt)then(Debug.log"attempt to access Async from thread not holding the Async lock"(Backtrace.get(),t,Time.now())[%sexp_of:Backtrace.t*t*Time.t];exit1)));;(* Try to create a timerfd. It returns [None] if [Core] is not built with timerfd support
or if it is not available on the current system. *)lettry_create_timerfd()=matchTimerfd.createwith|Error_->None|Okcreate->letclock=Timerfd.Clock.realtimein(trySome(createclock~flags:Timerfd.Flags.(nonblock+cloexec))with|Unix.Unix_error(ENOSYS,_,_)->(* Kernel too old. *)None|Unix.Unix_error(EINVAL,_,_)->(* Flags are only supported with Linux >= 2.6.27, try without them. *)lettimerfd=createclockinUnix.set_close_on_exec(timerfd:Timerfd.t:>Unix.File_descr.t);Unix.set_nonblock(timerfd:Timerfd.t:>Unix.File_descr.t);Sometimerfd);;letdefault_handle_thread_pool_stuckthread_pool~stuck_for=ifTime_ns.Span.(>=)stuck_forConfig.report_thread_pool_stuck_forthen(letshould_abort=Time_ns.Span.(>=)stuck_forConfig.abort_after_thread_pool_stuck_forinlettext="Async's thread pool is stuck"inlettext=ifshould_abortthentextelsesprintf"%s, and will raise an exception in %s"text(Time_ns.Span.to_short_string(Time_ns.Span.(-)Config.abort_after_thread_pool_stuck_forstuck_for))inletmessage=[%message""~_:(ifam_running_testthenTime_ns.epochelseTime_ns.now():Time_ns.t)text~stuck_for:(Time_ns.Span.to_short_stringstuck_for:string)~num_threads_created:(Thread_pool.num_threadsthread_pool:int)~max_num_threads:(Thread_pool.max_num_threadsthread_pool:int)~last_thread_creation_failure:(Thread_pool.last_thread_creation_failurethread_pool:(Sexp.toption[@sexp.option]))]inifshould_abortthenMonitor.send_exnMonitor.main(Error.to_exn(Error.create_smessage))elseCore.Debug.eprint_smessage);;letdetect_stuck_thread_poolt=letis_stuck=reffalseinletbecame_stuck_at=refTime_ns.epochinletstuck_num_work_completed=ref0inClock.every(sec1.)~continue_on_error:false(fun()->ifnot(Thread_pool.has_unstarted_workt.thread_pool)thenis_stuck:=falseelse(letnow=Time_ns.now()inletnum_work_completed=Thread_pool.num_work_completedt.thread_poolinif!is_stuck&&num_work_completed=!stuck_num_work_completedthent.handle_thread_pool_stuckt.thread_pool~stuck_for:(Time_ns.diffnow!became_stuck_at)else(is_stuck:=true;became_stuck_at:=now;stuck_num_work_completed:=num_work_completed)));;letthread_safe_wakeup_schedulert=Interruptor.thread_safe_interruptt.interruptorleti_am_the_schedulert=current_thread_id()=t.scheduler_thread_idletset_fd_desired_watchingt(fd:Fd.t)read_or_writedesired=Read_write.setfd.watchingread_or_writedesired;ifnotfd.watching_has_changedthen(fd.watching_has_changed<-true;Stack.pusht.fds_whose_watching_has_changedfd);;letrequest_start_watchingtfdread_or_writewatching=ifDebug.file_descr_watcherthenDebug.log"request_start_watching"(read_or_write,fd,t)[%sexp_of:Read_write.Key.t*Fd.t*t];ifnotfd.supports_nonblock(* Some versions of epoll complain if one asks it to monitor a file descriptor that
doesn't support nonblocking I/O, e.g. a file. So, we never ask the
file-descr-watcher to monitor such descriptors. *)then`Unsupportedelse(letresult=matchRead_write.getfd.watchingread_or_writewith|Watch_once_|Watch_repeatedly_->`Already_watching|Stop_requested->(* We don't [inc_num_active_syscalls] in this case, because we already did when we
transitioned from [Not_watching] to [Watching]. Also, it is possible that [fd]
was closed since we transitioned to [Stop_requested], in which case we don't want
to [start_watching]; we want to report that it was closed and leave it
[Stop_requested] so the the file-descr-watcher will stop watching it and we can
actually close it. *)ifFd.is_closedfdthen`Already_closedelse`Watching|Not_watching->(matchFd.inc_num_active_syscallsfdwith|`Already_closed->`Already_closed|`Ok->`Watching)in(matchresultwith|`Already_closed|`Already_watching->()|`Watching->set_fd_desired_watchingtfdread_or_writewatching;ifnot(i_am_the_schedulert)thenthread_safe_wakeup_schedulert);result);;letrequest_stop_watchingtfdread_or_writevalue=ifDebug.file_descr_watcherthenDebug.log"request_stop_watching"(read_or_write,value,fd,t)[%sexp_of:Read_write.Key.t*Fd.ready_to_result*Fd.t*t];matchRead_write.getfd.watchingread_or_writewith|Stop_requested|Not_watching->()|Watch_onceready_to->Ivar.fillready_tovalue;set_fd_desired_watchingtfdread_or_writeStop_requested;ifnot(i_am_the_schedulert)thenthread_safe_wakeup_schedulert|Watch_repeatedly(job,finished)->(matchvaluewith|`Ready->Kernel_scheduler.enqueue_jobt.kernel_schedulerjob~free_job:false|(`Closed|`Bad_fd|`Interrupted)asvalue->Ivar.fillfinishedvalue;set_fd_desired_watchingtfdread_or_writeStop_requested;ifnot(i_am_the_schedulert)thenthread_safe_wakeup_schedulert);;let[@cold]post_check_got_timerfdfile_descr=raise_s[%message"File_descr_watcher returned the timerfd as ready to be written to"(file_descr:File_descr.t)];;let[@cold]post_check_invalid_fdfile_descr=raise_s[%message"File_descr_watcher returned unknown file descr"(file_descr:File_descr.t)];;letpost_check_handle_fdtfile_descrread_or_writevalue=ifFd_by_descr.memt.fd_by_descrfile_descrthen(letfd=Fd_by_descr.find_exnt.fd_by_descrfile_descrinrequest_stop_watchingtfdread_or_writevalue)else(matcht.timerfdwith|SometfdwhenFile_descr.equalfile_descr(tfd:>Unix.File_descr.t)->(matchread_or_writewith|`Read->(* We don't need to actually call [read] since we are using the
edge-triggered behavior. *)()|`Write->post_check_got_timerfdfile_descr)|_->post_check_invalid_fdfile_descr);;letcreate?(thread_pool_cpu_affinity=Config.thread_pool_cpu_affinity)?(file_descr_watcher=Config.file_descr_watcher)?(max_num_open_file_descrs=Config.max_num_open_file_descrs)?(max_num_threads=Config.max_num_threads)()=ifdebugthenDebug.log_string"creating scheduler";letthread_pool=ok_exn(Thread_pool.create()~cpu_affinity:thread_pool_cpu_affinity~max_num_threads:(Max_num_threads.rawmax_num_threads))inletnum_file_descrs=Max_num_open_file_descrs.rawmax_num_open_file_descrsinletfd_by_descr=Fd_by_descr.create~num_file_descrsinletcreate_fdkindfile_descrinfo=letfd=Fd.createkindfile_descrinfoinok_exn(Fd_by_descr.addfd_by_descrfd);fdinletinterruptor=Interruptor.create~create_fdinlett_ref=refNonein(* set below, after [t] is defined *)lethandle_fdread_or_writeready_or_bad_fdfile_descr=match!t_refwith|None->assertfalse|Somet->post_check_handle_fdtfile_descrread_or_writeready_or_bad_fdinlethandle_fd_read_ready=handle_fd`Read`Readyinlethandle_fd_read_bad=handle_fd`Read`Bad_fdinlethandle_fd_write_ready=handle_fd`Write`Readyinlethandle_fd_write_bad=handle_fd`Write`Bad_fdinletfile_descr_watcher,timerfd=matchfile_descr_watcherwith|Select->letwatcher=Select_file_descr_watcher.create~num_file_descrs~handle_fd_read_ready~handle_fd_read_bad~handle_fd_write_ready~handle_fd_write_badinletmoduleW=structincludeSelect_file_descr_watcherletwatcher=watcherendin(moduleW:File_descr_watcher.S),None|Epoll|Epoll_if_timerfd->lettimerfd=matchtry_create_timerfd()with|None->raise_s[%message{|Async refuses to run using epoll on a system that doesn't support timer FDs, since
Async will be unable to timeout with sub-millisecond precision.|}]|Sometimerfd->timerfdinletwatcher=Epoll_file_descr_watcher.create~num_file_descrs~timerfd~handle_fd_read_ready~handle_fd_write_readyinletmoduleW=structincludeEpoll_file_descr_watcherletwatcher=watcherendin(moduleW:File_descr_watcher.S),Sometimerfdinletkernel_scheduler=Kernel_scheduler.t()inlett={mutex=Nano_mutex.create();is_running=false;have_called_go=false;fds_whose_watching_has_changed=Stack.create();file_descr_watcher;time_spent_waiting_for_io=Tsc.Span.of_int_exn0;fd_by_descr;timerfd;timerfd_set_at=Time_ns.max_value_for_1us_rounding;scheduler_thread_id=-1(* set when [be_the_scheduler] is called *);interruptor;signal_manager=Raw_signal_manager.create~thread_safe_notify_signal_delivered:(fun()->Interruptor.thread_safe_interruptinterruptor);thread_pool;handle_thread_pool_stuck=default_handle_thread_pool_stuck;busy_pollers=Busy_pollers.create();busy_poll_thread_is_running=false;next_tsc_calibration=Tsc.now();kernel_scheduler;have_lock_do_cycle=None;max_inter_cycle_timeout=Config.max_inter_cycle_timeout;min_inter_cycle_timeout=Config.min_inter_cycle_timeout;initialized_at=Backtrace.get()}int_ref:=Somet;detect_stuck_thread_poolt;update_check_accesstConfig.detect_invalid_access_from_thread;t;;letinit()=the_one_and_only_ref:=Ready_to_initialize(fun()->create())let()=init()letreset_in_forked_process()=(match!the_one_and_only_refwith|Not_ready_to_initialize|Ready_to_initialize_->()|Initialized{file_descr_watcher;timerfd;_}->letmoduleF=(valfile_descr_watcher:File_descr_watcher.S)inF.reset_in_forked_processF.watcher;(matchtimerfdwith|None->()|Sometfd->Unix.close(tfd:>Unix.File_descr.t)));Kernel_scheduler.reset_in_forked_process();init();;(* [thread_safe_reset] shuts down Async, exiting the scheduler thread, freeing up all
Async resources (file descriptors, threads), and resetting Async global state, so that
one can recreate a new Async scheduler afterwards. [thread_safe_reset] blocks until
the shutdown is complete; it must be called from outside Async, e.g. the main
thread. *)letthread_safe_reset()=match!the_one_and_only_refwith|Not_ready_to_initialize|Ready_to_initialize_->()|Initializedt->assert(not(am_holding_lockt));Thread_pool.finished_witht.thread_pool;Thread_pool.block_until_finishedt.thread_pool;(* We now schedule a job that, when it runs, exits the scheduler thread. We then wait
for that job to run. We acquire the Async lock so that we can schedule the job,
but release it before we block, so that the scheduler can acquire it. *)letscheduler_thread_finished=Thread_safe_ivar.create()inwith_lockt(fun()->schedule(fun()->Thread_safe_ivar.fillscheduler_thread_finished();Thread.exit()));Thread_safe_ivar.readscheduler_thread_finished;reset_in_forked_process();;letmake_async_unusable()=reset_in_forked_process();Kernel_scheduler.make_async_unusable();the_one_and_only_ref:=Ready_to_initialize(fun()->raise_s[%sexp"Async is unusable due to [Scheduler.make_async_unusable]"]);;letthread_safe_enqueue_external_jobtf=Kernel_scheduler.thread_safe_enqueue_external_jobt.kernel_schedulerf;;lethave_lock_do_cyclet=ifdebugthenDebug.log"have_lock_do_cycle"t[%sexp_of:t];matcht.have_lock_do_cyclewith|Somef->f()|None->Kernel_scheduler.run_cyclet.kernel_scheduler;(* If we are not the scheduler, wake it up so it can process any remaining jobs, clock
events, or an unhandled exception. *)ifnot(i_am_the_schedulert)thenthread_safe_wakeup_schedulert;;let[@cold]log_sync_changed_fds_to_file_descr_watchertfile_descrdesired=letmoduleF=(valt.file_descr_watcher:File_descr_watcher.S)inDebug.log"File_descr_watcher.set"(file_descr,desired,F.watcher)[%sexp_of:File_descr.t*boolRead_write.t*F.t];;let[@cold]sync_changed_fd_failedtfddesiredexn=raise_s[%message"sync_changed_fds_to_file_descr_watcher unable to set fd"(desired:boolRead_write.t)(fd:Fd.t)(exn:exn)~scheduler:(t:t)];;letsync_changed_fds_to_file_descr_watchert=(* We efficiently do nothing if nothing has changed, avoiding even the definition of
[module F], which can have some cost. *)ifnot(Stack.is_emptyt.fds_whose_watching_has_changed)thenletmoduleF=(valt.file_descr_watcher:File_descr_watcher.S)inwhilenot(Stack.is_emptyt.fds_whose_watching_has_changed)doletfd=Stack.pop_exnt.fds_whose_watching_has_changedinfd.watching_has_changed<-false;letdesired=Read_write.mapfd.watching~f:(funwatching->matchwatchingwith|Watch_once_|Watch_repeatedly_->true|Not_watching|Stop_requested->false)inifDebug.file_descr_watcherthenlog_sync_changed_fds_to_file_descr_watchertfd.file_descrdesired;(tryF.setF.watcherfd.file_descrdesiredwith|exn->sync_changed_fd_failedtfddesiredexn);(* We modify Async's data structures after calling [F.set], so that
the error message produced by [sync_changed_fd_failed] displays
them as they were before the call. *)Read_write.iterifd.watching~f:(funread_or_writewatching->matchwatchingwith|Watch_once_|Watch_repeatedly_|Not_watching->()|Stop_requested->Read_write.setfd.watchingread_or_writeNot_watching;dec_num_active_syscalls_fdtfd)done;;letmaybe_calibrate_tsct=ifLazy.is_valTsc.calibratorthen(letnow=Tsc.now()inifTsc.(>=)nowt.next_tsc_calibrationthen(letcalibrator=forceTsc.calibratorinTsc.Calibrator.calibratecalibrator;t.next_tsc_calibration<-Tsc.addnow(Tsc.Span.of_ns(Int63.of_int1_000_000_000)~calibrator)));;letcreate_job?execution_contexttfx=letexecution_context=matchexecution_contextwith|Somee->e|None->current_execution_contexttinKernel_scheduler.create_jobt.kernel_schedulerexecution_contextfx;;letdump_core_on_job_delay()=matchConfig.dump_core_on_job_delaywith|Do_not_watch->()|Watch{dump_if_delayed_by;how_to_dump}->Dump_core_on_job_delay.start_watching~dump_if_delayed_by:(Time_ns.Span.to_span_float_round_nearestdump_if_delayed_by)~how_to_dump;;letinitt=dump_core_on_job_delay();Kernel_scheduler.set_thread_safe_external_job_hookt.kernel_scheduler(fun()->thread_safe_wakeup_schedulert);t.scheduler_thread_id<-current_thread_id();(* We handle [Signal.pipe] so that write() calls on a closed pipe/socket get EPIPE but
the process doesn't die due to an unhandled SIGPIPE. *)Raw_signal_manager.managet.signal_managerSignal.pipe;letinterruptor_finished=Ivar.create()inletinterruptor_read_fd=Interruptor.read_fdt.interruptorinletproblem_with_interruptor()=raise_s[%message"can not watch interruptor"(interruptor_read_fd:Fd.t)~scheduler:(t:t)]in(matchrequest_start_watchingtinterruptor_read_fd`Read(Watch_repeatedly(Kernel_scheduler.create_jobt.kernel_schedulerExecution_context.mainFn.ignore(),interruptor_finished))with|`Already_watching|`Watching->()|`Unsupported|`Already_closed->problem_with_interruptor());upon(Ivar.readinterruptor_finished)(fun_->problem_with_interruptor());;(* We avoid allocation in [check_file_descr_watcher], since it is called every time in
the scheduler loop. *)letcheck_file_descr_watchert~timeoutspan_or_unit=letmoduleF=(valt.file_descr_watcher:File_descr_watcher.S)inifDebug.file_descr_watcherthenDebug.log"File_descr_watcher.pre_check"t[%sexp_of:t];letpre=F.pre_checkF.watcherinunlockt;(* We yield so that other OCaml threads (especially thread-pool threads) get a chance to
run. This is a good point to yield, because we do not hold the Async lock, which
allows other threads to acquire it. [Thread.yield] only yields if other OCaml
threads are waiting to acquire the OCaml lock, and is fast if not. As of OCaml 4.07,
[Thread.yield] on Linux calls [nanosleep], which causes the Linux scheduler to
actually switch to other threads. *)Thread.yield();ifDebug.file_descr_watcherthenDebug.log"File_descr_watcher.thread_safe_check"(File_descr_watcher_intf.Timeout.variant_oftimeoutspan_or_unit,t)[%sexp_of:[`Never|`Immediately|`AfterofTime_ns.Span.t]*t];letbefore=Tsc.now()inletcheck_result=F.thread_safe_checkF.watcherpretimeoutspan_or_unitinletafter=Tsc.now()int.time_spent_waiting_for_io<-Tsc.Span.(+)t.time_spent_waiting_for_io(Tsc.diffafterbefore);lockt;(* We call [Interruptor.clear] after [thread_safe_check] and before any of the
processing that needs to happen in response to [thread_safe_interrupt]. That
way, even if [Interruptor.clear] clears out an interrupt that hasn't been
serviced yet, the interrupt will still be serviced by the immediately following
processing. *)Interruptor.cleart.interruptor;ifDebug.file_descr_watcherthenDebug.log"File_descr_watcher.post_check"(check_result,t)[%sexp_of:F.Check_result.t*t];F.post_checkF.watchercheck_result;;(* We compute the timeout as the last thing before [check_file_descr_watcher], because
we want to make sure the timeout is zero if there are any scheduled jobs. The code
is structured to avoid calling [Time_ns.now] and [Linux_ext.Timerfd.set_*] if
possible. In particular, we only call [Time_ns.now] if we need to compute the
timeout-after span. And we only call [Linux_ext.Timerfd.set_after] if the time that
we want it to fire is different than the time it is already set to fire. *)letcompute_timeout_and_check_file_descr_watchert=letmin_inter_cycle_timeout=(t.min_inter_cycle_timeout:>Time_ns.Span.t)inletmax_inter_cycle_timeout=(t.max_inter_cycle_timeout:>Time_ns.Span.t)inletfile_descr_watcher_timeout=matcht.timerfdwith|None->(* Since there is no timerfd, use the file descriptor watcher timeout. *)ifKernel_scheduler.can_run_a_jobt.kernel_schedulerthenmin_inter_cycle_timeoutelseifnot(Kernel_scheduler.has_upcoming_eventt.kernel_scheduler)thenmax_inter_cycle_timeoutelse(letnext_event_at=Kernel_scheduler.next_upcoming_event_exnt.kernel_schedulerinTime_ns.Span.minmax_inter_cycle_timeout(Time_ns.Span.maxmin_inter_cycle_timeout(Time_ns.diffnext_event_at(Time_ns.now()))))|Sometimerfd->(* Set [timerfd] to fire if necessary, taking into account [can_run_a_job],
[min_inter_cycle_timeout], and [next_event_at]. *)lethave_min_inter_cycle_timeout=Time_ns.Span.(>)min_inter_cycle_timeoutTime_ns.Span.zeroinifKernel_scheduler.can_run_a_jobt.kernel_schedulerthenifnothave_min_inter_cycle_timeoutthenTime_ns.Span.zeroelse(t.timerfd_set_at<-Time_ns.max_value_for_1us_rounding;Linux_ext.Timerfd.set_aftertimerfdmin_inter_cycle_timeout;max_inter_cycle_timeout)elseifnot(Kernel_scheduler.has_upcoming_eventt.kernel_scheduler)thenmax_inter_cycle_timeoutelse(letnext_event_at=Kernel_scheduler.next_upcoming_event_exnt.kernel_schedulerinletset_timerfd_at=ifnothave_min_inter_cycle_timeoutthennext_event_atelseTime_ns.maxnext_event_at(Time_ns.add(Time_ns.now())min_inter_cycle_timeout)inifnot(Time_ns.equalt.timerfd_set_atset_timerfd_at)then(t.timerfd_set_at<-set_timerfd_at;Linux_ext.Timerfd.set_attimerfdset_timerfd_at);max_inter_cycle_timeout)inifTime_ns.Span.(<=)file_descr_watcher_timeoutTime_ns.Span.zerothencheck_file_descr_watchert~timeout:Immediately()elsecheck_file_descr_watchert~timeout:Afterfile_descr_watcher_timeout;;letone_itert=maybe_calibrate_tsct;sync_changed_fds_to_file_descr_watchert;compute_timeout_and_check_file_descr_watchert;ifdebugthenDebug.log_string"handling delivered signals";Raw_signal_manager.handle_deliveredt.signal_manager;have_lock_do_cyclet;;letbe_the_scheduler?(raise_unhandled_exn=false)t=initt;letrecloop()=(* At this point, we have the lock. *)ifKernel_scheduler.check_invariantst.kernel_schedulertheninvariantt;matchKernel_scheduler.uncaught_exnt.kernel_schedulerwith|Someerror->unlockt;error|None->one_itert;loop()inleterror_kind,error=try`User_uncaught,loop()with|exn->`Async_uncaught,Error.create"bug in async scheduler"(exn,t)[%sexp_of:exn*t]inifraise_unhandled_exnthenError.raiseerrorelse((* One reason to run [do_at_exit] handlers before printing out the error message is
that it helps curses applications bring the terminal in a good state, otherwise
the error message might get corrupted. Also, the OCaml top-level uncaught
exception handler does the same. *)(tryCaml.do_at_exit()with|_->());(matcherror_kindwith|`User_uncaught->(* Don't use Debug.log, to avoid redundant error (task_id in particular) *)eprintf!"%{Sexp#hum}\n%!"[%sexp(Time_ns.now():Time_ns.t),(error:Error.t)]|`Async_uncaught->Debug.log"unhandled exception in Async scheduler"error[%sexp_of:Error.t];Debug.log_string"dumping core";Dump_core_on_job_delay.dump_core());Unix.exit_immediately1);;letadd_finalizertheap_blockf=Kernel_scheduler.add_finalizert.kernel_schedulerheap_blockf;;letadd_finalizer_exntxf=add_finalizert(Heap_block.create_exnx)(funheap_block->f(Heap_block.valueheap_block));;letset_task_id()=Async_kernel_config.task_id:=fun()->letpid=Unix.getpid()inletthread_id=Thread.id(Thread.self())in[%sexp_of:[`pidofPid.t]*[`thread_idofint]](`pidpid,`thread_idthread_id);;letgo?raise_unhandled_exn()=ifdebugthenDebug.log_string"Scheduler.go";set_task_id();lett=the_one_and_only~should_lock:falsein(* [go] is called from the main thread and so must acquire the lock if the thread has
not already done so implicitly via use of an async operation that uses
[the_one_and_only]. *)ifnot(am_holding_lockt)thenlockt;ift.have_called_gothenraise_s[%message"cannot Scheduler.go more than once"];t.have_called_go<-true;ifnott.is_runningthen(t.is_running<-true;be_the_schedulert?raise_unhandled_exn)else(unlockt;(* We wakeup the scheduler so it can respond to whatever async changes this thread
made. *)thread_safe_wakeup_schedulert;(* Since the scheduler is already running, so we just pause forever. *)Time.pause_forever());;letgo_main?raise_unhandled_exn?file_descr_watcher?max_num_open_file_descrs?max_num_threads~main()=(match!the_one_and_only_refwith|Not_ready_to_initialize|Ready_to_initialize_->()|Initialized{initialized_at;_}->raise_s[%message"Async was initialized prior to [Scheduler.go_main]"(initialized_at:Backtrace.t)]);letmax_num_open_file_descrs=Option.mapmax_num_open_file_descrs~f:Max_num_open_file_descrs.create_exninletmax_num_threads=Option.mapmax_num_threads~f:Max_num_threads.create_exninthe_one_and_only_ref:=Ready_to_initialize(fun()->create?file_descr_watcher?max_num_open_file_descrs?max_num_threads());Deferred.upon(return())main;go?raise_unhandled_exn();;letis_running()=ifis_ready_to_initialize()thenfalseelse(the_one_and_only~should_lock:false).is_running;;letreport_long_cycle_times?(cutoff=sec1.)()=Stream.iter(long_cycles~at_least:(cutoff|>Time_ns.Span.of_span_float_round_nearest))~f:(funspan->eprintf"%s\n%!"(Error.to_string_hum(Error.create"long async cycle"span[%sexp_of:Time_ns.Span.t])));;letset_check_invariantsbool=Kernel_scheduler.(set_check_invariants(t())bool)letset_detect_invalid_access_from_threadbool=update_check_access(the_one_and_only~should_lock:false)bool;;letset_max_inter_cycle_timeoutspan=(the_one_and_only~should_lock:false).max_inter_cycle_timeout<-Max_inter_cycle_timeout.create_exn(Time_ns.Span.of_span_float_round_nearestspan);;letstart_busy_poller_thread_if_not_runningt=ifnott.busy_poll_thread_is_runningthen(t.busy_poll_thread_is_running<-true;letkernel_scheduler=t.kernel_schedulerinlet_thread:Thread.t=Thread.create(fun()->letrecloop()=lockt;ifBusy_pollers.is_emptyt.busy_pollersthen(t.busy_poll_thread_is_running<-false;unlockt(* We don't loop here, thus exiting the thread. *))else(Busy_pollers.pollt.busy_pollers;ifKernel_scheduler.num_pending_jobskernel_scheduler>0thenKernel_scheduler.run_cyclekernel_scheduler;unlockt;(* The purpose of this [yield] is to release the OCaml lock while not
holding the async lock, so that the busy-poll loop spends a significant
fraction of its time not holding both locks, which thus allows other
OCaml threads that want to hold both locks the chance to run. *)Thread.yield();loop())inloop())()in());;letadd_busy_pollerpoll=lett=the_one_and_only~should_lock:trueinletresult=Busy_pollers.addt.busy_pollerspollinstart_busy_poller_thread_if_not_runningt;result;;type'bfolder={folder:'a.'b->t->(t,'a)Field.t->'b}lett()=the_one_and_only~should_lock:trueletfold_fields(typea)~initfolder:a=lett=t()inletfacfield=folder.folderactfieldinFields.fold~init~mutex:f~is_running:f~have_called_go:f~fds_whose_watching_has_changed:f~file_descr_watcher:f~time_spent_waiting_for_io:f~fd_by_descr:f~timerfd:f~timerfd_set_at:f~scheduler_thread_id:f~interruptor:f~signal_manager:f~thread_pool:f~handle_thread_pool_stuck:f~busy_poll_thread_is_running:f~busy_pollers:f~next_tsc_calibration:f~kernel_scheduler:f~have_lock_do_cycle:f~max_inter_cycle_timeout:f~min_inter_cycle_timeout:f~initialized_at:f;;lethandle_thread_pool_stuckf=lett=t()inletkernel_scheduler=t.kernel_schedulerinletexecution_context=Kernel_scheduler.current_execution_contextkernel_schedulerint.handle_thread_pool_stuck<-(fun_~stuck_for->Kernel_scheduler.enqueuekernel_schedulerexecution_context(fun()->f~stuck_for)());;