package async_unix

  1. Overview
  2. Docs
Legend:
Library
Module
Module type
Parameter
Class
Class type

Dispatches and monitors Async processes.

The threading model is as follows. Only one thread runs Async code at a time. This is enforced by a single lock in Async's scheduler data structure. There are any number of threads running code without holding the lock that get data from the outside world and want to affect the Async world. They do this by calling Thread_safe.run_in_async*, which acquires the lock, does a computation (e.g., fills an ivar), and then runs a "cycle" of Async computations.

type t
val sexp_of_t : t -> Sexplib0.Sexp.t
include module type of struct include Async_kernel.Async_kernel_scheduler end
type 'a with_options := ?monitor:Async_kernel.Monitor.t -> ?priority:Async_kernel.Priority.t -> 'a
val current_execution_context : unit -> Async_kernel.Execution_context.t
val within_context : Async_kernel.Execution_context.t -> (unit -> 'a) -> ('a, unit) Core.Result.t

within_context context f runs f () right now with the specified execution context. If f raises, then the exception is sent to the monitor of context, and Error () is returned.

within' f ~monitor ~priority runs f () right now, with the specified block group, monitor, and priority set as specified. They will be reset to their original values when f returns. If f raises, then the result of within' will never become determined, but the exception will end up in the specified monitor.

val within : ((unit -> unit) -> unit) with_options

within is like within', but doesn't require the thunk to return a deferred.

val within_v : ((unit -> 'a) -> 'a option) with_options

within_v is like within, but allows a value to be returned by f.

val with_local : 'a Core.Univ_map.Key.t -> 'a option -> f:(unit -> 'b) -> 'b

with_local key value ~f, when run in the current execution context, e, runs f right now in a new execution context, e', that is identical to e except that find_local key = value. As usual, e' will be in effect in asynchronous computations started by f. When with_local returns, the execution context is restored to e.

val find_local : 'a Core.Univ_map.Key.t -> 'a option

find_local key returns the value associated to key in the current execution context.

val schedule' : ((unit -> 'a Async_kernel.Deferred.t) -> 'a Async_kernel.Deferred.t) with_options

Just like within', but instead of running the thunk right now, adds it to the Async queue to be run with other Async jobs.

val schedule : ((unit -> unit) -> unit) with_options

Just like schedule', but doesn't require the thunk to return a deferred.

val enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit

eneque_job execution_context.t f a enqueues into the scheduler's job queue a job that will run f a in execution_context.

val thread_safe_enqueue_job : Async_kernel.Execution_context.t -> ('a -> unit) -> 'a -> unit

thread_safe_enqueue_job is like enqueue_job, except it is for use from a thread that doesn't hold the Async lock.

val preserve_execution_context : ('a -> unit) -> ('a -> unit) Core.Staged.t

preserve_execution_context t f saves the current execution context and returns a function g such that g a runs f a in the saved execution context. g a becomes determined when f a becomes determined.

val preserve_execution_context' : ('a -> 'b Async_kernel.Deferred.t) -> ('a -> 'b Async_kernel.Deferred.t) Core.Staged.t
val cycle_start : unit -> Core.Time_float.t

cycle_start () returns the result of Time.now () called at the beginning of cycle.

val cycle_start_ns : unit -> Core.Time_ns.t

cycle_times () returns a stream that is extended with an element at the start of each Async cycle, with the amount of time that the previous cycle took, as determined by calls to Time.now at the beginning and end of the cycle.

last_cycle_time returns the time spent in the most recently completed cycle.

long_cycles ~at_least returns a stream of cycles whose duration is at least at_least. long_cycles is more efficient than cycle_times because it only allocates a stream entry when there is a long cycle, rather than on every cycle.

val cycle_count : unit -> int

cycle_count () returns the total number of Async cycles that have happened.

total_cycle_time () returns the total (wall) time spent executing jobs in Async cycles.

val event_precision : unit -> Core.Time_float.Span.t

The alarm_precision of the timing-wheel used to implement Async's Clock.

val event_precision_ns : unit -> Core.Core_private.Time_ns_alternate_sexp.Span.t
val force_current_cycle_to_end : unit -> unit

force_current_cycle_to_end () causes no more normal priority jobs to run in the current cycle, and for the end-of-cycle jobs (i.e., writes) to run, and then for the cycle to end.

val set_max_num_jobs_per_priority_per_cycle : int -> unit

set_max_num_jobs_per_priority_per_cycle int sets the maximum number of jobs that will be done at each priority within each Async cycle. The default is 500. max_num_jobs_per_priority_per_cycle retrieves the current value.

val max_num_jobs_per_priority_per_cycle : unit -> int
val set_record_backtraces : bool -> unit

set_record_backtraces do_record sets whether Async should keep in the execution context the history of stack backtraces (obtained via Backtrace.get) that led to the current job. If an Async job has an unhandled exception, this backtrace history will be recorded in the exception. In particular the history will appear in an unhandled exception that reaches the main monitor. This can have a substantial performance impact, both in running time and space usage.

val recording_backtraces : unit -> bool
val yield : unit -> unit Async_kernel.Deferred.t

yield () returns a deferred that becomes determined after the current cycle completes. This can be useful to improve fairness by yielding within a computation to give other jobs a chance to run.

val yield_until_no_jobs_remain : ?may_return_immediately:bool -> unit -> unit Async_kernel.Deferred.t

yield_until_no_jobs_remain () returns a deferred that becomes determined the next time Async's job queue is empty. This is useful in tests when one needs to wait for the completion of all the jobs based on what's in the queue, when those jobs might create other jobs -- without depending on I/O or the passage of wall-clock time.

may_return_immediately determines how yield_until_no_jobs_remain behaves if the job queue is currently empty. If may_return_immediately = true, then yield_until_no_jobs_remain will return (). If may_return_immediately = false, then yield_until_no_jobs_remain's result will become determined after the next Async cycle. We hope to someday change the default may_return_immediately from false to true.

val yield_every : n:int -> (unit -> unit Async_kernel.Deferred.t) Core.Staged.t

yield_every ~n returns a function that will act as yield every n calls and as return () the rest of the time. This is useful for improving fairness in circumstances where you don't have good control of the batch size, but can insert a deferred into every iteration.

yield_every raises if n <= 0.

val num_jobs_run : unit -> int

num_jobs_run () returns the number of jobs that have been run since starting. The returned value includes the currently running job.

val num_pending_jobs : unit -> int

num_pending_jobs returns the number of jobs that are queued to run by the scheduler.

module Which_watcher : sig ... end
val t : unit -> t

t () returns the Async scheduler. If the scheduler hasn't been created yet, this will create it and acquire the Async lock.

Accessors

val max_num_open_file_descrs : unit -> int
val max_num_threads : unit -> int
val go : ?raise_unhandled_exn:bool -> unit -> Core.never_returns

go ?raise_unhandled_exn () passes control to Async, at which point Async starts running handlers, one by one without interruption, until there are no more handlers to run. When Async is out of handlers, it blocks until the outside world schedules more of them. Because of this, Async programs do not exit until shutdown is called.

go () calls handle_signal Sys.sigpipe, which causes the SIGPIPE signal to be ignored. Low-level syscalls (e.g., write) still raise EPIPE.

If any Async job raises an unhandled exception that is not handled by any monitor, Async execution ceases. Then, by default, Async pretty prints the exception, and exits with status 1. If you don't want this, pass ~raise_unhandled_exn:true, which will cause the unhandled exception to be raised to the caller of go ().

val go_main : ?raise_unhandled_exn:bool -> ?file_descr_watcher:Which_watcher.t -> ?max_num_open_file_descrs:int -> ?max_num_threads:int -> main:(unit -> unit) -> unit -> Core.never_returns

go_main is like go, except that you supply a main function that will be run to initialize the Async computation, and that go_main will fail if any Async has been used prior to go_main being called. Moreover it allows you to configure more static options of the scheduler.

val raise_if_any_jobs_were_scheduled : unit -> unit

raise_if_any_jobs_were_scheduled () will raise an exception if any async work has ever been scheduled. This is intended to be called before a program starts, for instance before Command.run, to ensure that no libraries have started async work. This can happen by mistake (by calling a deferred function at toplevel), which can make the behavior of the program unexpectedly non-deterministic.

val report_long_cycle_times : ?cutoff:Time_float_unix.Span.t -> unit -> unit

report_long_cycle_times ?cutoff () sets up something that will print a warning to stderr whenever there is an Async cycle that is too long, as specified by cutoff, whose default is 1s.

val is_running : unit -> bool

is_running () returns true if the scheduler has been started.

val set_max_inter_cycle_timeout : Time_float_unix.Span.t -> unit

set_max_inter_cycle_timeout span sets the maximum amount of time the scheduler will remain blocked (on epoll or select) between cycles.

val set_check_invariants : bool -> unit

set_check_invariants do_check sets whether Async should check invariants of its internal data structures. set_check_invariants true can substantially slow down your program.

val set_detect_invalid_access_from_thread : bool -> unit

set_detect_invalid_access_from_thread do_check sets whether Async routines should check if they are being accessed from some thread other than the thread currently holding the Async lock, which is not allowed and can lead to very confusing behavior.

type 'b folder = {
  1. folder : 'a. 'b -> t -> (t, 'a) Core.Field.t -> 'b;
}
val fold_fields : init:'b -> 'b folder -> 'b

fold_fields ~init folder folds folder over each field in the scheduler. The fields themselves are not exposed -- folder must be a polymorphic function that can work on any field. So, it's only useful for generic operations, e.g., getting the size of each field.

val is_ready_to_initialize : unit -> bool
val is_initialized : unit -> bool
val reset_in_forked_process : unit -> unit

If a process that has already created, but not started, the Async scheduler would like to fork, and would like the child to have a clean Async, i.e., not inherit any of the Async work that was done in the parent, it can call reset_in_forked_process at the start of execution in the child process. After that, the child can do Async stuff and then start the Async scheduler.

reset_in_forked_process () is a no-op if is_initialized () = false and has undefined behavior if is_running () = true.

val reset_in_forked_process_without_taking_lock : unit -> unit

reset_in_forked_process_without_taking_lock is similar to reset_in_forked_process, with the difference that async lock is not taken by the calling thread. This means it's not safe to do Async stuff unless you obtain the lock first, usually by calling functions from the Thread_safe module.

val make_async_unusable : unit -> unit

make_async_unusable () makes subsequent attempts to use the Async scheduler raise. One use case for make_async_unusable is if you fork from a process already running the Async scheduler, and want to run non-Async OCaml code in the child process, with the guarantee that the child process does not use Async.

val handle_thread_pool_stuck : (stuck_for:Time_ns_unix.Span.t -> unit) -> unit

handle_thread_pool_stuck f causes f to run whenever Async detects its thread pool is stuck (i.e., hasn't completed a job for over a second and has work waiting to start). Async checks every second. By default, if the thread pool has been stuck for less than 60s, Async will eprintf a message. If more than 60s, Async will send an exception to the main monitor, which will abort the program unless there is a custom handler for the main monitor.

Calling handle_thread_pool_stuck replaces whatever behavior was previously there.

val default_handle_thread_pool_stuck : Thread_pool.t -> stuck_for:Time_ns_unix.Span.t -> unit
val time_spent_waiting_for_io : unit -> Time_ns_unix.Span.t

time_spent_waiting_for_io () returns the amount of time that the Async scheduler has spent in calls to epoll_wait (or select) since the start of the program.

val set_min_inter_cycle_timeout : Time_ns_unix.Span.t -> unit

set_min_inter_cycle_timeout sets the minimum timeout that the scheduler will pass to the OS when it checks for I/O between cycles. The minimum is zero by default. Setting it to a nonzero value is used to increase thread fairness between the scheduler and other threads. A plausible setting is 10us. This can also be set via the ASYNC_CONFIG environment variable.

val fds_may_produce_events : unit -> bool

Returns true if any user-created fds are registered with the file descriptor watcher.

The intended use case for this function (together with thread_pool_has_unfinished_work) is approximate deadlock detection, so that a test can crash when it runs out of things to do.

val thread_pool_has_unfinished_work : unit -> bool

Returns true if any of the threads in the thread pool are in use. Note that this value can change from true to false "suddenly" (from a separate thread) when a thread pool thread finishes.

However, the work items submitted to the thread pool by In_thread.run enqueue their result as an external_job before finishing.

So if you observe thread_pool_has_unfinished_work () = false and then confirm that there are no external jobs in the scheduler, then you know you didn't miss any thead pool work.

val add_busy_poller : t -> max_busy_wait_duration:Time_ns_unix.Span.t -> Busy_poller.packed -> unit

If any busy pollers exist, they will be called in a busy loop whenever the scheduler is waiting on I/O before an Async cycle, with the guarantee that they will be called at least once before every Async cycle.

While the busy loop is running the program will only be responsive to the events detected by the pollers and to timing wheel alarms, but won't be responsive to anything else (signals, fd events, thread interruptions).

We do not allow the busy loop to run longer than max_inter_cycle_timeout. After adding the new busy poller, this function sets the max_inter_cycle_timeout to Time_ns.Span.min max_inter_cycle_timeout max_busy_wait_duration.

The pollers will run with the async lock held, so it's OK to access Async data structures, in particular to schedule jobs and alarms.

Pollers run in the main monitor.

val num_busy_pollers : t -> int
module External : sig ... end

Instead of calling Scheduler.go, the External module can be used to drive Async with an external loop, manually running regular Async cycles. This is useful to integrate Async with another event loop system.

module For_metrics : sig ... end
module For_tests : sig ... end
OCaml

Innovation. Community. Security.