package tezt
Install
Dune Dependency
Authors
Maintainers
Sources
md5=7878acd788ae59f1a07d0392644f0fff
sha512=b9e8ce2576b0bc65870409380edf17b88656a985ceb9a438a84f479b51d6b30740acf7b035eccf7d122bf5227611bf15e888e607dcdbb1576b4383f12314dd49
doc/tezt.scheduler/Scheduler/index.html
Module Scheduler
Source
Running tasks on a set number of forked processes.
The main content of this module is:
- a function
add_task
to add tasks to a queue; - a function
run
to run those tasks in separate worker processes; - a module
Message
to send messages to and from workers; - a module
Timer
to schedule delayed functions.
The scheduler (i.e. run
) maintains a pool of workers. Workers receive tasks from the scheduler, execute them and send the result to the scheduler. Tasks can be given a limited amount of time to run. New tasks can be added (with add_task
) while the scheduler is running in response to events such as:
- a task being started;
- a task finishing (successfully or not);
- a message being received from a worker (i.e. from a running task);
- the task queue becoming empty.
Examples of messages include:
- logs: tasks can send log messages to the scheduler, so that the scheduler can write them in a single file with no interleaving;
- global resource queries: workers can query a resource such as a free port number, and the scheduler can respond with this resource.
Messages are meant to be used while a task is running. The return value of a task is also sent to the scheduler as a message, but this is handled transparently.
Examples of use cases for this library include:
- a test framework that wants to sandbox tests in parallel and in separate processes (e.g. to be able to recover from crashes, and to kill them if they take too long);
- a build system that wants to compile multiple targets in parallel, and to add the reverse dependencies of a target as new tasks once this target is built (such as
make -j
).
Contexts
Message sending functions are supposed to be run from a specific context:
- the worker process, which can send messages to and receive message from the scheduler;
- or the scheduler process with a specific worker in mind, from which to receive from and to which to send to.
Those message sending functions take a value of type worker_context
or scheduler_context
as proof. (There are ways to leak the contexts to other contexts but it would be a programming error.)
Values used by message sending functions meant to be called from a worker.
Values used by message sending functions meant to be called from the scheduler.
Get the current worker context.
Returns None
if not currently in a worker process.
Messages
Timers
Task Queue
val add_task :
?sigterm:int ->
?term_timeout:float ->
?kill_timeout:float ->
?on_term_timeout:(unit -> unit) ->
?on_kill_timeout:(unit -> unit) ->
?on_start:(scheduler_context -> unit) ->
?on_message:(scheduler_context -> Message.t -> unit) ->
?on_finish:(('a, string) result -> unit) ->
'a Message.typ ->
(worker_context -> 'a) ->
unit
Add a task to the queue.
Usage: add_task typ execute
typ
is a type description for values returned by execute
. When a worker is ready to execute this task, this worker will run execute
. Note that execute
is serialized to the worker using Marshal
. If this closure captures some variables, those variables should thus be serializable using Marshal
.
add_task
can be called before run
, or while run
is running (i.e. from an event handler like on_start
, on_message
, on_finish
; or the on_empty_queue
argument of run
).
If term_timeout
is specified, sigterm
is sent to the worker if the task has not finished (successfully or not) after term_timeout
seconds. sigterm
defaults to Sys.sigterm
.
If kill_timeout
is specified but term_timeout
is not, SIGKILL
is sent to the worker if the task has not finished (successfully or not) after kill_timeout
seconds.
If both term_timeout
and kill_timeout
are specified, sigterm
is sent first, and if the task is not willing to end gracefully kill_timeout
seconds after sigterm
was sent, SIGKILL
is sent as well. Note that in that case, kill_timeout
is relative to the time sigterm
was sent, not to the time the task started.
on_start
is triggered when the task is sent to a worker. It takes a scheduler_context
argument that allows to send a message to this worker, typically with additional information that was not known at the time the task was queued, such as a free port number that the worker can use.
on_message
is triggered for each message that is sent from the worker. It also takes a scheduler_context
argument to be able to respond.
on_finish
is triggered with:
Ok result
when the task returns successfully, in which caseresult
is the return value ofexecute
;Error error_message
when the task fails, in which caseerror_message
can be the result ofPrintexc.to_string
(ifexecute
raised an exception) or something else (e.g. if the worker died).
Clear the queue of tasks.
This has no effect on tasks that are already running, because they have been removed from the queue.
Main Loop
val run :
?worker_idle_timeout:float ->
?worker_kill_timeout:float ->
?on_worker_kill_timeout:(unit -> unit) ->
?on_empty_queue:(unit -> unit) ->
?on_message:(Message.t -> unit) ->
?on_unexpected_worker_exit:(Unix.process_status -> unit) ->
fork:(unit -> int) ->
int ->
unit
Run tasks until the queue is empty.
on_empty_queue
is called when a worker is available and the task queue is empty. It can in particular use add_task
to fill the queue.
on_message
is called when a worker emits a message while not executing a task. This can happen in particular if you use at_exit
. Messages received from a worker which is running a task are passed to the on_message
of the corresponding add_task
call instead.
When a worker exits:
- if it is running a task, the task fails (its
on_finish
is triggered withError
); - if it is not running a task and the queue is not empty, or if the exit code is not 0,
on_unexpected_worker_exit
is called so that you can emit a warning.
If worker_idle_timeout
is specified, workers stop if they are not given any task after worker_idle_timeout
seconds of doing nothing. This can be useful to prevent workers from running forever, although in general they should detect that the scheduler is dead by receiving end of file while trying to receive their next task.
If worker_kill_timeout
is specified, send SIGKILL
to workers if they are still running worker_kill_timeout
seconds after they were told to stop. This only applies when they were told to stop because the task queue is empty. When this happens, on_worker_kill_timeout
is called.
fork
is supposed to be Unix.fork
. But if tasks may use Lwt, it should be Lwt_unix.fork
instead. You can also modify fork to run some code on each fork, for instance to initialize some global variables when a worker starts.
The last argument is the maximum number of tasks to run in parallel.
This function is blocking. It returns once:
- no task is currently running;
- the queue is empty (unless you called
stop
); - and no timer is currently active. In particular, it returns immediately if you never called
add_task
andTimer.on_delay
.
Stop the current run
.
This function is meant to be called from the event handlers (on_
functions) of tasks. It does nothing if run
is not currently running.
This function is not blocking. It causes run
to stop starting new tasks. It also causes run
to consider that all current tasks are passed their term_timeout
, even if they do not actually have such a timeout, except that on_term_timeout
is not triggered. In other words, all workers receive SIGTERM
if they didn't already.
Calling add_task
still adds tasks to the queue but they will not be started unless you call run
again.
This function does not cancel timers.
Miscellaneous
Convert a process status to a human-readable string.
Example results:
"exited with code 0"
"was killed by SIGTERM"
"was stopped by unknown signal (-100)"