package async
Install
Dune Dependency
Authors
Maintainers
Sources
sha256=3edbaa62a7ea910da0d5c06974da9b2b7e14a9e2648596bc7c9f09c89970b5f8
doc/async.async_rpc/Async_rpc/Rpc/Connection/index.html
Module Rpc.Connection
Source
include module type of struct include Async_rpc_kernel.Rpc.Connection end
After add_heartbeat_callback t f
, f ()
will be called after every subsequent heartbeat received by t
.
Changes the heartbeat timeout and restarts the timer by setting last_seen_alive
to the current time.
The last time either any message has been received or reset_heartbeat_timeout
was called.
val close :
?streaming_responses_flush_timeout:Core.Time_ns.Span.t ->
?reason:Core.Info.t ->
t ->
unit Async_kernel.Deferred.t
close
starts closing the connection's transport, and returns a deferred that becomes determined when its close completes. It is ok to call close
multiple times on the same t
; calls subsequent to the initial call will have no effect, but will return the same deferred as the original call.
Before closing the underlying transport's writer, close
waits for all streaming responses to be Pipe.upstream_flushed
with a timeout of streaming_responses_flush_timeout
.
The reason
for closing the connection will be passed to callers of close_reason
.
close_finished
becomes determined after the close of the connection's transport completes, i.e. the same deferred that close
returns. close_finished
differs from close
in that it does not have the side effect of initiating a close.
val close_reason :
t ->
on_close:[ `started | `finished ] ->
Core.Info.t Async_kernel.Deferred.t
close_reason ~on_close t
becomes determined when close starts or finishes based on on_close
, but additionally returns the reason that the connection was closed.
is_closed t
returns true
iff close t
has been called. close
may be called internally upon errors or timeouts.
bytes_to_write
and flushed
just call the similarly named function on the Transport.Writer.t
within a connection.
bytes_written
just calls the similarly named functions on the Transport.Writer.t
within a connection.
bytes_read
just calls the similarly named function on the Transport.Reader.t
within a connection.
Peer menu will become determined before any other messages are received. The menu is sent automatically on creation of a connection. If the peer is using an older version, the value is immediately determined to be None
. If the connection is closed before the menu is received, an error is returned.
It is expected that one will call Versioned_rpc.Connection_with_menu.create
instead of this function and that will request the menu via rpc if it gets None
.
Like peer_menu
but returns an rpc result
Peer identification will become determined before any other messages are received. If the peer is using an older version, the peer id is immediately determined to be None
. If the connection is closed before the menu is received, None
is returned.
val create :
?implementations:'s Implementations.t ->
connection_state:(t -> 's) ->
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
(t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t
These functions are mostly the same as the ones with the same names in Async_rpc_kernel.Rpc.Connection
; see Connection_intf
in that library for documentation. The differences are that:
- they take an
Async_unix.Reader.t
,Async_unix.Writer.t
andmax_message_size
instead of aTransport.t
- they use
Time
instead ofTime_ns
As of Feb 2017, the RPC protocol started to contain a magic number so that one can identify RPC communication. The bool returned by contains_magic_prefix
says whether this magic number was observed.
This operation is a "peek" that does not advance any pointers associated with the reader. In particular, it makes sense to call create
on a reader after calling this function.
val with_close :
?implementations:'s Implementations.t ->
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
connection_state:(t -> 's) ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
dispatch_queries:(t -> 'a Async_kernel.Deferred.t) ->
on_handshake_error:
[ `Raise | `Call of Core.Exn.t -> 'a Async_kernel.Deferred.t ] ->
'a Async_kernel.Deferred.t
val server_with_close :
?max_message_size:int ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
Async_unix.Reader.t ->
Async_unix.Writer.t ->
implementations:'s Implementations.t ->
connection_state:(t -> 's) ->
on_handshake_error:
[ `Raise | `Ignore | `Call of Core.Exn.t -> unit Async_kernel.Deferred.t ] ->
unit Async_kernel.Deferred.t
A function creating a transport from a file descriptor. It is responsible for setting the low-level parameters of the underlying transport.
For instance to set up a transport using Async.{Reader,Writer}
and set a buffer age limit on the writer, you can pass this to the functions of this module:
~make_transport:(fun fd ~max_message_size ->
Rpc.Transport.of_fd fd ~max_message_size ~buffer_age_limit:`Unlimited)
val serve :
implementations:'s Implementations.t ->
initial_connection_state:('address -> t -> 's) ->
where_to_listen:('address, 'listening_on) Async_unix.Tcp.Where_to_listen.t ->
?max_connections:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:('address -> bool) ->
?on_handshake_error:[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] ->
?on_handler_error:[ `Raise | `Ignore | `Call of 'address -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
unit ->
('address, 'listening_on) Async_unix.Tcp.Server.t Async_kernel.Deferred.t
serve implementations ~port ?on_handshake_error ()
starts a server with the given implementation on port
. The optional auth function will be called on all incoming connections with the address info of the client and will disconnect the client immediately if it returns false. This auth mechanism is generic and does nothing other than disconnect the client -- any logging or record of the reasons is the responsibility of the auth function itself.
val serve_inet :
implementations:'s Implementations.t ->
initial_connection_state:(Async_unix.Socket.Address.Inet.t -> t -> 's) ->
where_to_listen:Async_unix.Tcp.Where_to_listen.inet ->
?max_connections:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:(Async_unix.Socket.Address.Inet.t -> bool) ->
?on_handshake_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Inet.t -> exn -> unit ] ->
?on_handler_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Inet.t -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
unit ->
(Async_unix.Socket.Address.Inet.t, int) Async_unix.Tcp.Server.t
As serve
, but only accepts IP addresses, not Unix sockets; returns server immediately rather than asynchronously.
val serve_unix :
implementations:'s Implementations.t ->
initial_connection_state:
(Async_unix.Socket.Address.Unix.t ->
Linux_ext.Peer_credentials.t ->
t ->
's) ->
where_to_listen:Async_unix.Tcp.Where_to_listen.unix ->
?max_connections:int ->
?backlog:int ->
?drop_incoming_connections:bool ->
?time_source:[> Core.read ] Async_kernel.Time_source.T1.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?auth:(Async_unix.Socket.Address.Unix.t -> bool) ->
?on_handshake_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Unix.t -> exn -> unit ] ->
?on_handler_error:
[ `Raise
| `Ignore
| `Call of Async_unix.Socket.Address.Unix.t -> exn -> unit ] ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
unit ->
Async_unix.Tcp.Server.unix Async_kernel.Deferred.t
As serve
, but only accepts Unix sockets; provides peer credentials of the socket to initial_connection_state
.
val client :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
_ Async_unix.Tcp.Where_to_connect.t ->
(t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t
client where_to_connect ()
connects to the server at where_to_connect
and returns the connection or an Error if a connection could not be made. It is the responsibility of the caller to eventually call close
.
In client
and with_client
, the handshake_timeout
encompasses both the TCP connection timeout and the timeout for this module's own handshake.
val client' :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
'address Async_unix.Tcp.Where_to_connect.t ->
('address * t, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t
Similar to client
, but additionally expose the Socket.Address.t
of the RPC server that we connected to.
val with_client :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
_ Async_unix.Tcp.Where_to_connect.t ->
(t -> 'a Async_kernel.Deferred.t) ->
('a, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t
with_client where_to_connect f
connects to the server at where_to_connect
and runs f until an exception is thrown or until the returned Deferred is fulfilled.
NOTE: As with with_close
, you should be careful when using this with Pipe_rpc
. See with_close
for more information.
val with_client' :
?implementations:Client_implementations.t ->
?max_message_size:int ->
?make_transport:transport_maker ->
?handshake_timeout:Core.Time_float.Span.t ->
?heartbeat_config:Heartbeat_config.t ->
?description:Core.Info.t ->
?identification:Core.Bigstring.t ->
'transport Async_unix.Tcp.Where_to_connect.t ->
(remote_server:'transport -> t -> 'a Async_kernel.Deferred.t) ->
('a, Core.Exn.t) Core.Result.t Async_kernel.Deferred.t
Similar to with_client
, but additionally expose the Socket.Address.t
of the RPC server that we connected to.