Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file thread_pool.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723open!Coreopen!ImportmoduleCpu_affinity=Thread_pool_cpu_affinityletdebug_logmessageasexp_of_a=eprintf"%s\n%!"(Sexp.to_string_hum[%sexp(Time_ns.now():Time_ns.t),(message:string),(a:a)]);;moduleTime_ns=structincludeTime_nsletsexp_of_tt=[%sexp(ifam_running_testthenepochelset:t)]endmoduleThread_id:sigtypet[@@derivingsexp_of]includeHashablewithtypet:=tvalof_ocaml_thread:Core.Thread.t->tvalself:unit->tend=structincludeIntletof_ocaml_thread=Core.Thread.idletself()=of_ocaml_thread(Core.Thread.self())endmodulePriority=Linux_ext.Priorityletpriority_zero=Priority.of_int0letgetpriority=matchLinux_ext.getprioritywith|Error_->constpriority_zero|Okf->f;;letsetpriority=matchLinux_ext.setprioritywith|Error_->Fn.ignore|Okf->f;;letset_thread_name=matchLinux_ext.pr_set_name_first16with|Okf->f|Error_->Fn.ignore;;letdebug=reffalse(* We define everything in an [Internal] module, and then wrap in a
[Mutex.critical_section] each thread-safe function exposed in the mli.
When reading code here, keep in mind that there are two entry points:
(1) The functions that are exposed for external consumption in the mli (these are
protected by the mutex).
(2) Code that is called within threads created in this module. All such code should
acquire the mutex before it affects the thread state. *)moduleInternal=structmoduleMutex=Nano_mutexletcheck_invariant=reffalseleterror=Or_error.errormodulePool_id:Unique_id=Unique_id.Int63()moduleWork=structtypet={(* When this work starts running, the name of the thread will be set (via
[Linux_ext.pr_set_name]) to[name]. *)name:string;doit:unit->unit;priority:Priority.t}[@@derivingsexp_of]endmoduleWork_queue=structtypeelt=|Stop|WorkofWork.t[@@derivingsexp_of]typet=eltSqueue.t[@@derivingsexp_of]letcreate()=Squeue.create1letenqueuetwork=Squeue.push_uncondtworkendmoduleHelper_thread=structtype'threadt={in_pool:Pool_id.t;mutablestate:[`In_use|`Finishing|`Finished];thread:'thread(* [default_name] will be used as the name of work run by the helper thread,
unless that work is added with an overriding name. *);default_name:string(* [default_priority] will be used as the priority of work run by the helper
thread, unless that work is added with an overriding priority. *);default_priority:Priority.t}[@@derivingfields,sexp_of]endmoduleThread=structtypet={(* [name] is the name of the thread that the OS knows, i.e. the argument supplied
to the most recent call to [set_thread_name] by the thread. *)mutablename:string(* [thread_id] is the OCaml thread id of the OCaml thread that this corresponds
to. It is an option only because we create this object before creating the
thread. We set it to [Some] as soon as we create the thread, and then never
change it. *);mutablethread_id:Thread_id.toption(* [priority] is the priority of the thread that the OS knows, i.e. the argument
supplied in the most recent call to [setpriority] by the thread. *);mutablepriority:Priority.t(* A thread can be "available", meaning that it isn't working on anything, or
doing work added to the thread pool, or serving as a helper thread. *);mutablestate:[`Available|`Working|`Helperof(t[@sexp.opaque])Helper_thread.t](* [unfinished_work] is the amount of work remaining for this thread to do. It
includes all the work in [work_queue], plus perhaps an additional work that is
running. *);mutableunfinished_work:int(* [work_queue] is where this thread pulls work from. Each thread has its own
queue. If a thread is working for the general pool, then its work queue has at
most one element. If a thread is a helper thread, then the work queue has all
the unfinished work that has been added for the helper thread. *);work_queue:Work_queue.t}[@@derivingfields,sexp_of]letinvariantt:unit=tryletcheckinvariantfield=invariant(Field.getfieldt)inFields.iter~name:ignore~thread_id:(check(funo->assert(is_someo)))~priority:ignore~state:ignore~unfinished_work:(check(fununfinished_work->assert(unfinished_work=Squeue.lengtht.work_queue||unfinished_work=Squeue.lengtht.work_queue+1)))~work_queue:ignorewith|exn->raise_s[%message"Thread.invariant failed"(exn:exn)~thread:(t:t)];;letis_availablet=matcht.statewith|`Available->true|`Working|`Helper_->false;;letcreatepriority={name="";thread_id=None;priority;state=`Available;unfinished_work=0;work_queue=Work_queue.create()};;letenqueue_worktwork=t.unfinished_work<-t.unfinished_work+1;Work_queue.enqueuet.work_queue(Workwork);;letstopt=Work_queue.enqueuet.work_queueStopletinitialize_ocaml_threadt(cpu_affinity:Cpu_affinity.t)=set_thread_namet.name;(* We call [getpriority] to see whether we need to set the priority. This is not a
performance optimization. It is done so that in programs that don't use
priorities, we never call [setpriority], and thus prevent problems due to the
user's "ulimit -e" being too restrictive. The use of [getpriority] is limited to
initialization, and is not used elsewhere in this module. *)ifnot(Priority.equal(getpriority())t.priority)thensetpriorityt.priority;matchcpu_affinitywith|Inherit->()|Cpusetcpuset->Or_error.ok_exnCore.Thread.setaffinity_self_exn(cpuset|>Cpu_affinity.Cpuset.raw);;letset_nametname=ifString.(<>)namet.namethen(set_thread_namename;t.name<-name);;letset_prioritytpriority=ifnot(Priority.equalt.prioritypriority)then(setprioritypriority;t.priority<-priority);;endmoduleThread_creation_failure=structtypet={at:Time_ns.t;error:Error.t}[@@derivingfields,sexp_of]end(* [Thread_pool.t] *)typet={id:Pool_id.t(** [state] starts as [`In_use] when the thread pool is created. When the user calls
[finished_with], it transitions to [`Finishing]. When the last work is done, it
transitions to [`Finished] and fills [finished]. *);mutablestate:[`In_use|`Finishing|`Finished];finished:unitThread_safe_ivar.t(* [mutex] is used to protect all access to [t] and its substructures, since the
threads actually doing the work need to access[t]. *);mutex:Mutex.t(** [default_priority] is the priority that will be used for work unless that work is
added with an overriding priority. It is set to whatever the priority is when the
thread pool is created. *);default_priority:Priority.t(* [max_num_threads] is the maximum number of threads that the thread pool is allowed
to create. *);max_num_threads:int(* [cpu_affinity] is the desired CPU affinity for threads in this pool. *);cpu_affinity:Cpu_affinity.t(* [num_threads] is the number of threads that have been created by the pool. The
thread pool guarantees that [num_threads <= max_num_threads]. *);mutablenum_threads:int(* [thread_creation_failure_lockout] is the amount of time that must pass after a
thread-creation failure before the thread pool will make another attempt to create
a thread. *);mutablethread_creation_failure_lockout:Time_ns.Span.t(* [last_thread_creation_failure] has information about the last time that
[Core.Thread.create] raised. *);mutablelast_thread_creation_failure:Thread_creation_failure.toption(* [thread_by_id] holds all the threads that have been created by the pool. *);mutablethread_by_id:Thread.tThread_id.Table.t(* [available_threads] holds all threads that have [state = `Available]. It is used
as a stack so that the most recently used available thread is used next, on the
theory that this is better for locality. *);mutableavailable_threads:Thread.tlist(* [work_queue] holds work to be done for which no thread is available. *);work_queue:Work.tQueue.t(* [unfinished_work] holds the amount of work that has been submitted to the pool but
not yet been completed. *);mutableunfinished_work:int;mutablenum_work_completed:int}[@@derivingfields,sexp_of]letinvariantt:unit=tryletcheckinvariantfield=invariant(Field.getfieldt)inFields.iter~id:ignore~state:(check(function|`In_use|`Finishing->()|`Finished->assert(t.unfinished_work=0);assert(t.num_threads=0)))~finished:ignore~cpu_affinity:ignore~mutex:(checkMutex.invariant)~default_priority:ignore~max_num_threads:(check(funmax_num_threads->assert(max_num_threads>=1)))~num_threads:(check(funnum_threads->assert(num_threads=Hashtbl.lengtht.thread_by_id);assert(num_threads<=t.max_num_threads)))~thread_creation_failure_lockout:ignore~last_thread_creation_failure:ignore~thread_by_id:(check(funthread_by_id->Thread_id.Table.invariantThread.invariantthread_by_id))~available_threads:(check(funavailable_threads->assert(List.lengthavailable_threads<=t.num_threads);List.iteravailable_threads~f:(funthread->assert(Hashtbl.existst.thread_by_id~f:(funthread'->phys_equalthreadthread'));assert(Thread.is_availablethread))))~work_queue:(check(funwork_queue->(* It is possible that:
{[
has_unstarted_work t
&& t.num_threads < t.max_num_threads ]}
This happens when adding work and [Core.Thread.create] raises. In that
case, the thread pool enqueues the work and continues with the threads it
has. If the thread pool can't make progress, then Async's thread-pool-stuck
detection will later report it. *)assert(Queue.is_emptywork_queue||List.is_emptyt.available_threads)))~unfinished_work:(check(fununfinished_work->assert(unfinished_work>=0)))~num_work_completed:(check(funnum_work_completed->assert(num_work_completed>=0)))with|exn->raise_s[%message"Thread_pool.invariant failed"(exn:exn)~thread_pool:(t:t)];;letis_finishedt=matcht.statewith|`Finished->true|`Finishing|`In_use->false;;letis_in_uset=matcht.statewith|`In_use->true|`Finishing|`Finished->false;;lethas_unstarted_workt=not(Queue.is_emptyt.work_queue)letcreate?(cpu_affinity=Cpu_affinity.Inherit)~max_num_threads()=ifmatchcpu_affinitywith|Inherit->false|Cpuset_->Or_error.is_errorCore.Thread.setaffinity_self_exnthenOr_error.error_string"Thread_pool.create setaffinity not supported on this platform"elseifmax_num_threads<1thenerror"Thread_pool.create max_num_threads was < 1"max_num_threads[%sexp_of:int]else(lett={id=Pool_id.create();state=`In_use;finished=Thread_safe_ivar.create();mutex=Mutex.create();default_priority=getpriority();cpu_affinity;max_num_threads;num_threads=0;thread_by_id=Thread_id.Table.create();thread_creation_failure_lockout=sec1.;last_thread_creation_failure=None;available_threads=[];work_queue=Queue.create();unfinished_work=0;num_work_completed=0}inOkt);;letmaybe_finisht=matcht.statewith|`In_use|`Finished->()|`Finishing->ift.unfinished_work=0then(letsetxf=Option.value_exn(Field.setterf)txinFields.iter~id:ignore~state:(set`Finished)~finished:(fun_->Thread_safe_ivar.fillt.finished())~mutex:ignore~default_priority:ignore~cpu_affinity:ignore~max_num_threads:ignore~num_threads:(set0)~thread_creation_failure_lockout:ignore~last_thread_creation_failure:ignore~thread_by_id:(fun_->Hashtbl.itert.thread_by_id~f:Thread.stop;Hashtbl.cleart.thread_by_id)~available_threads:(set[])~work_queue:ignore~unfinished_work:ignore~num_work_completed:ignore);;letfinished_witht=if!debugthendebug_log"Thread_pool.finished_with"t[%sexp_of:t];matcht.statewith|`Finishing|`Finished->()|`In_use->t.state<-`Finishing;maybe_finisht;;letassign_work_to_thread(thread:Thread.t)work=thread.state<-`Working;Thread.enqueue_workthreadwork;;letmake_thread_availabletthread=if!debugthendebug_log"make_thread_available"(thread,t)[%sexp_of:Thread.t*t];matchQueue.dequeuet.work_queuewith|Somework->assign_work_to_threadthreadwork|None->thread.state<-`Available;t.available_threads<-thread::t.available_threads;maybe_finisht;;letmaybe_finish_helper_threadt(helper_thread:Thread.tHelper_thread.t)=matchhelper_thread.statewith|`In_use|`Finished->()|`Finishing->letthread=helper_thread.threadinifthread.unfinished_work=0then(helper_thread.state<-`Finished;make_thread_availabletthread);;letcreate_threadt=if!debugthendebug_log"create_thread"t[%sexp_of:t];letthread=Thread.createt.default_priorityinletocaml_thread=Or_error.try_with(fun()->Core.Thread.create(fun()->Thread.initialize_ocaml_threadthreadt.cpu_affinity;letrecloop()=matchSqueue.popthread.work_queuewith|Stop->()|Workwork->if!debugthendebug_log"thread got work"(work,thread,t)[%sexp_of:Work.t*Thread.t*t];Thread.set_namethreadwork.name;Thread.set_prioritythreadwork.priority;(trywork.doit()(* the actual work *)with|_->());t.num_work_completed<-t.num_work_completed+1;if!debugthendebug_log"thread finished with work"(work,thread,t)[%sexp_of:Work.t*Thread.t*t];Mutex.critical_sectiont.mutex~f:(fun()->t.unfinished_work<-t.unfinished_work-1;thread.unfinished_work<-thread.unfinished_work-1;matchthread.statewith|`Available->raise_s[%message"thread-pool thread unexpectedly available"(thread:Thread.t)]|`Helperhelper_thread->maybe_finish_helper_threadthelper_thread|`Working->make_thread_availabletthread);loop()inloop())())inOr_error.mapocaml_thread~f:(funocaml_thread->letthread_id=Thread_id.of_ocaml_threadocaml_threadinthread.thread_id<-Somethread_id;t.num_threads<-t.num_threads+1;Hashtbl.add_exnt.thread_by_id~key:thread_id~data:thread;thread);;letlast_thread_creation_failure_att=Option.value_mapt.last_thread_creation_failure~default:Time_ns.epoch~f:Thread_creation_failure.at;;letget_available_threadt=if!debugthendebug_log"get_available_thread"t[%sexp_of:t];matcht.available_threadswith|thread::rest->t.available_threads<-rest;`Okthread|[]->ift.num_threads=t.max_num_threadsthen`None_availableelse(letnow=Time_ns.now()inifTime_ns.Span.(<)(Time_ns.diffnow(last_thread_creation_failure_att))t.thread_creation_failure_lockoutthen`None_availableelse(matchcreate_threadtwith|Okthread->`Okthread|Errorerror->t.last_thread_creation_failure<-Some{at=now;error};`None_available));;letinc_unfinished_workt=t.unfinished_work<-t.unfinished_work+1letdefault_thread_name="thread-pool thread"letadd_work?priority?nametdoit=if!debugthendebug_log"add_work"t[%sexp_of:t];ifnot(is_in_uset)thenerror"add_work called on finished thread pool"t[%sexp_of:t]else(letwork={Work.doit;name=Option.valuename~default:default_thread_name;priority=Option.valuepriority~default:t.default_priority}ininc_unfinished_workt;(matchget_available_threadtwith|`None_available->Queue.enqueuet.work_queuework|`Okthread->assign_work_to_threadthreadwork);Ok());;letbecome_helper_thread_internal?priority?namet~(get_thread:t->Thread.tOr_error.t)=if!debugthendebug_log"become_helper_thread_internal"t[%sexp_of:t];ifnot(is_in_uset)thenerror"become_helper_thread_internal called on finished thread pool"t[%sexp_of:t]else(matchget_threadtwith|Error_ase->e|Okthread->lethelper_thread={Helper_thread.default_name=Option.valuename~default:"helper_thread";default_priority=Option.valuepriority~default:t.default_priority;in_pool=t.id;state=`In_use;thread}inthread.state<-`Helperhelper_thread;Okhelper_thread);;letcreate_helper_thread?priority?namet=become_helper_thread_internal?priority?namet~get_thread:(funt->matchget_available_threadtwith|`Okthread->Okthread|`None_available->error~strict:()"create_helper_thread could not get a thread"t[%sexp_of:t]);;letbecome_helper_thread?priority?namet=become_helper_thread_internal?priority?namet~get_thread:(funt->matchHashtbl.findt.thread_by_id(Thread_id.self())with|Somethread->Okthread|None->Or_error.error_string"become_helper_thread not called within thread-pool thread");;letadd_work_for_helper_thread?priority?namethelper_threaddoit=if!debugthendebug_log"add_work_for_helper_thread"(helper_thread,t)[%sexp_of:Thread.tHelper_thread.t*t];ifnot(Pool_id.equalt.idhelper_thread.in_pool)thenerror"add_work_for_helper_thread called on helper thread not in pool"(helper_thread,t)[%sexp_of:Thread.tHelper_thread.t*t]elseifnot(is_in_uset)thenerror"add_work_for_helper_thread called on finished thread pool"t[%sexp_of:t]else(matchhelper_thread.statewith|`Finishing|`Finished->error"add_work_for_helper_thread called on helper thread no longer in use"(helper_thread,t)[%sexp_of:Thread.tHelper_thread.t*t]|`In_use->let{Helper_thread.thread;_}=helper_threadininc_unfinished_workt;Thread.enqueue_workthread{Work.name=Option.valuename~default:(Helper_thread.default_namehelper_thread);doit;priority=Option.valuepriority~default:(Helper_thread.default_priorityhelper_thread)};Ok());;letfinished_with_helper_threadthelper_thread=if!debugthendebug_log"finished_with_helper_thread"(helper_thread,t)[%sexp_of:Thread.tHelper_thread.t*t];ifnot(Pool_id.equalt.idhelper_thread.in_pool)thenraise_s[%message"finished_with_helper_thread called on helper thread not in pool"(helper_thread:Thread.tHelper_thread.t)~thread_pool:(t:t)]else(matchhelper_thread.statewith|`Finishing|`Finished->()|`In_use->helper_thread.state<-`Finishing;maybe_finish_helper_threadthelper_thread);;end(* Now we define everything to be exported, being careful to wrap everything in
[Mutex.critical_section] that needs to be. *)openInternaltypet=Internal.t[@@derivingsexp_of]letthread_creation_failure_lockout=Internal.thread_creation_failure_lockoutletlast_thread_creation_failuret=Option.mapt.last_thread_creation_failure~f:Thread_creation_failure.sexp_of_t;;letcritical_sectiont~f=Mutex.critical_sectiont.mutex~f:(fun()->protect~f~finally:(fun()->if!check_invarianttheninvariantt));;letinvariantt=critical_sectiont~f:(fun()->invariantt)letcreate?cpu_affinity~max_num_threads()=Result.map(create?cpu_affinity~max_num_threads())~f:(funt->if!check_invarianttheninvariantt;t);;letfinished_witht=critical_sectiont~f:(fun()->finished_witht)(* We do not use [critical_section] for [block_until_finished] because it is already
thread safe, and we do not want it to hold [t]'s lock while blocking, because we must
allow the finishing threads to acquire [t]'s lock. *)letblock_until_finishedt=Thread_safe_ivar.readt.finishedletcpu_affinity=cpu_affinityletdefault_priority=default_prioritylethas_unstarted_work=has_unstarted_workletmax_num_threads=max_num_threadsletnum_threads=num_threadsletnum_work_completed=num_work_completedletunfinished_work=unfinished_workmoduleHelper_thread=structopenHelper_threadtypet=Thread.tHelper_thread.t[@@derivingsexp_of]letdefault_name=default_nameletdefault_priority=default_priorityendletadd_work?priority?nametdoit=critical_sectiont~f:(fun()->add_work?priority?nametdoit);;letbecome_helper_thread?priority?namet=critical_sectiont~f:(fun()->become_helper_thread?priority?namet);;letcreate_helper_thread?priority?namet=critical_sectiont~f:(fun()->create_helper_thread?priority?namet);;letadd_work_for_helper_thread?priority?namethelper_threaddoit=critical_sectiont~f:(fun()->add_work_for_helper_thread?priority?namethelper_threaddoit);;letfinished_with_helper_threadthelper_thread=critical_sectiont~f:(fun()->finished_with_helper_threadthelper_thread);;modulePrivate=structletcheck_invariant=check_invariantletdefault_thread_name=default_thread_nameletis_finished=is_finishedletis_in_use=is_in_useletset_last_thread_creation_failuretat=t.last_thread_creation_failure<-Some{at;error=Error.of_string"fake-error"};;letset_thread_creation_failure_lockouttspan=t.thread_creation_failure_lockout<-span;;end