Legend:
Library
Module
Module type
Parameter
Class
Class type
Cooperative system calls
This modules maps system calls, like those of the standard library's Unix module, to cooperative ones, which will not block the program.
The semantics of all operations is the following: if the action (for example reading from a file descriptor) can be performed immediately, it is done and returns immediately, otherwise it returns a sleeping thread which is woken up when the operation completes.
Most operations on sockets and pipes (on Windows it is only sockets) are cancelable, meaning you can cancel them with Lwt.cancel. For example if you want to read something from a file descriptor with a timeout, you can cancel the action after the timeout and the reading will not be performed if not already done.
For example, consider that you have two sockets sock1 and sock2. You want to read something from sock1 or exclusively from sock2 and fail with an exception if a timeout of 1 second expires, without reading anything from sock1 and sock2, even if they become readable in the future.
In this case, it is guaranteed that exactly one of the three operations will complete, and the others will be cancelled.
val handle_unix_error : ('a->'bLwt.t)->'a->'bLwt.t
Same as Unix.handle_unix_error but catches lwt-level exceptions
Configuration
type async_method =
| Async_none
(*
System calls are made synchronously, and may block the entire program.
*)
| Async_detach
(*
System calls are made in another system thread, thus without blocking other Lwt threads. The drawback is that it may degrade performance in some cases.
This is the default.
*)
| Async_switch
(*
System calls are made in the main thread, and if one blocks the execution continue in another system thread. This method is the most efficient, also you will get better performance if you force all threads to run on the same cpu. On linux this can be done by using the command taskset.
Note that this method is still experimental.
*)
For system calls that cannot be made asynchronously, Lwt uses one of the following method:
Returns the underlying unix file descriptor. It always succeeds, even if the file descriptor's state is not Open.
val of_unix_file_descr :
?blocking:bool ->?set_flags:bool ->Unix.file_descr ->file_descr
Creates a lwt file descriptor from a unix one.
blocking is the blocking mode of the file-descriptor, and describes how Lwt will use it. In non-blocking mode, read/write on this file descriptor are made using non-blocking IO; in blocking mode they are made using the current async method. If blocking is not specified it is guessed according to the file kind: socket and pipes are in non-blocking mode and others are in blocking mode.
If set_flags is true (the default) then the file flags are modified according to the blocking argument, otherwise they are left unchanged.
Note that the blocking mode is less efficient than the non-blocking one, so it should be used only for file descriptors that does not support asynchronous operations, such as regular files, or for shared descriptors such as stdout, stderr or stdin.
blocking fd returns whether fd is used in blocking or non-blocking mode.
val set_blocking : ?set_flags:bool ->file_descr->bool -> unit
set_blocking fd b puts fd in blocking or non-blocking mode. If set_flags is true (the default) then the file flags are modified, otherwise the modification is only done at the application level.
abort fd exn makes all current and further uses of the file descriptor fail with the given exception. This put the file descriptor into the Aborted state.
If the file descriptor is closed, this does nothing, if it is aborted, this replace the abort exception by exn.
Note that this only works for reading and writing operations on file descriptors supporting non-blocking mode.
Process handling
val fork : unit -> int
fork () does the same as Unix.fork. You must use this function instead of Unix.fork when you want to use Lwt in the child process.
Notes:
in the child process all pending jobs are canceled,
if you are going to use Lwt in the parent and the child, it is a good idea to call Lwt_io.flush_all before callling fork to avoid double-flush.
wait4 flags pid returns (pid, status, rusage) where (pid,
status) is the same result as Unix.waitpid flags pid, and rusage contains accounting information about the child.
On windows it will always returns { utime = 0.0; stime = 0.0 }.
val wait_count : unit -> int
Returns the number of threads waiting for a child to terminate.
Executes the given command, waits until it terminates, and return its termination status. The string is interpreted by the shell /bin/sh on Unix and cmd.exe on Windows. The result WEXITED 127 indicates that the shell couldn't be executed.
readdir_n handle count reads at most count entry from the given directory. It is more efficient than calling readdircount times. If the length of the returned array is smaller than count, this means that the end of the directory has been reached.
pipe_in () is the same as pipe but maps only the unix file descriptor for reading into a lwt one. The second is not put into non-blocking mode. You usually want to use this before forking to receive data from the child process.
val pipe_out : unit ->Unix.file_descr * file_descr
pipe_out () is the inverse of pipe_in. You usually want to use this before forking to send data to the child process
on_signal signum f calls f each time the signal with numnber signum is received by the process. It returns a signal handler identifier that can be used to stop monitoring signum.
reinstall_signal_handler signum if any signal handler is registered for this signal with on_signal, it reinstall the signal handler (with Sys.set_signal). This is usefull in case another part of the program install another signal handler.
recv_msg ~socket ~io_vectors receives data into a list of io-vectors, plus any file-descriptors that may accompany the messages. It returns a tuple whose first field is the number of bytes received and second is a list of received file descriptors. The messages themselves will be recorded in the provided io_vectors list.
This call is not available on windows.
val send_msg :
socket:file_descr->io_vectors:io_vector list->fds:Unix.file_descr list->int Lwt.t
send_msg ~socket ~io_vectors ~fds sends data from a list of io-vectors, accompanied with a list of file-descriptors. It returns the number of bytes sent. If fd-passing is not possible on the current system and fds is not empty, it raises Lwt_sys.Not_available "fd_passing".
get_credentials fd returns credentials information from the given socket. On some platforms, obtaining the peer pid is not possible and it will be set to -1. If obtaining credentials is not possible on the current system, it raises Lwt_sys.Not_available "get_credentials".
This call is not available on windows.
Socket options
type socket_bool_option = Unix.socket_bool_option =
| SO_DEBUG
| SO_BROADCAST
| SO_REUSEADDR
| SO_KEEPALIVE
| SO_DONTROUTE
| SO_OOBINLINE
| SO_ACCEPTCONN
| TCP_NODELAY
| IPV6_ONLY
type socket_int_option = Unix.socket_int_option =
| SO_SNDBUF
| SO_RCVBUF
| SO_ERROR
| SO_TYPE
| SO_RCVLOWAT
| SO_SNDLOWAT
type socket_optint_option = Unix.socket_optint_option =
| SO_LINGER
type socket_float_option = Unix.socket_float_option =
wrap_syscall set fd action wrap an action on a file descriptor. It tries to execute action, and if it can not be performed immediately without blocking, it is registered for later.
In the latter case, if the thread is canceled, action is removed from set.
Type of job descriptions. A job description describe how to call a C function and how to get its result. The C function may be executed in another system thread.
val execute_job :
?async_method:async_method->job:'ajob->result:('ajob->'b)->free:('ajob-> unit)->'bLwt.t
This is the old and deprecated way of running a job. Use run_job in new code.
If the method is Async_none then the job is run synchronously and may block the current system thread, thus blocking all Lwt threads.
If the method is Async_detach then the job is run in another system thread, unless the the maximum number of worker threads has been reached (as given by pool_size).
If the method is Async_switch then the job is run synchronously and if it blocks, execution will continue in another system thread (unless the limit is reached).
val abort_jobs : exn -> unit
abort_jobs exn make all pending jobs to fail with exn. Note that this does not abort the real job (i.e. the C function executing it), just the lwt thread for it.
val cancel_jobs : unit -> unit
cancel_jobs () is the same as abort_jobs Lwt.Canceled.
Lwt internally use a pipe to send notification to the main thread. The following functions allow to use this pipe.
val make_notification : ?once:bool ->(unit -> unit)-> int
new_notifier ?once f registers a new notifier. It returns the id of the notifier. Each time a notification with this id is received, f is called.
if once is specified, then the notification is stopped after the first time it is received. It defaults to false.
val send_notification : int -> unit
send_notification id sends a notification.
This function is thread-safe.
val stop_notification : int -> unit
Stop the given notification. Note that you should not reuse the id after the notification has been stopped, the result is unspecified if you do so.
val call_notification : int -> unit
Call the handler associated to the given notification. Note that if the notification was defined with once = true it is removed.
val set_notification : int ->(unit -> unit)-> unit
set_notification id f replace the function associated to the notification by f. It raises Not_found if the given notification is not found.
System threads pool
If the program is using the async method Async_detach or Async_switch, Lwt will launch system threads to execute blocking system calls asynchronously.
val pool_size : unit -> int
Maximum number of system threads that can be started. If this limit is reached, jobs will be executed synchronously.
val set_pool_size : int -> unit
Change the size of the pool.
val thread_count : unit -> int
The number of system threads running (excluding this one).
val thread_waiting_count : unit -> int
The number threads waiting for a job.
CPUs
val get_cpu : unit -> int
get_cpu () returns the number of the CPU the current thread is running on.
val get_affinity : ?pid:int ->unit ->int list
get_affinity ?pid () returns the list of CPUs the process with pid pid is allowed to run on. If pid is not specified then the affinity of the current process is returned.
val set_affinity : ?pid:int ->int list-> unit
set_affinity ?pid cpus sets the list of CPUs the given process is allowed to run on.