Legend:
Library
Module
Module type
Parameter
Class
Class type
A stream of elements.
A stream is an infinite sequence of elements. Since the stream is defined coinductively it can be processed only corecursively. That means that in general, one cannot aggregate a stream into normal (inductive) data. But it is still possible to get an aggregate snapshot of some intermediate state in the form of futures.
Streams can be observed and combined. There is no built in notion of the end of stream and all streams are considered infinite. It is still possible to simulate an end of stream, by using futures, that designates the end of stream condition.
Streams can be made lazy in the sense that if no one is watching a stream, then no work should be performed to feed the stream. This requires some cooperation from the feeder, as it should use on_subscribe and on_unsubscribe functions, to react on user's subsriptions. The has_subscribers is also useful.
Streams also provide some mechanism for a pushback, that allows a cooperative sink to limit his rate. The pushback interface consists of two functions:
wait that should be called by a consumer, when it wants to ask a producer to wait for a moment;
on_wait that is called when any consumer requested for a pause.
from f returns a stream that is generated from successive applications of a function f. A new value is produced by a stream, every time it is signaled with associated signal handler.
val unfold : init:'b->f:('b->'a * 'b)->'at * unit signal
unfold ~init ~f a more general than from way of building a stream, that allows to pass state between consecutive invocations of the generator function. A new value is produced by a stream, every time it is signaled with an associated signal handler.
val unfold_until :
init:'b->f:('b->('a * 'b) option)->'at * unit signal * unit future
unfold_until ~init ~f returns (stream,signal,future) is the same as unfold, except that function f is called until it returns a None value. Once this happens, the future becomes determined.
unfold' ~init ~f is a batched version of the unfold function. A new value is produced by a stream, every time it is signaled with associated signal handler.
repeat x returns a stream xs and a signal s. Every time s is signaled stream xs will produce a value x
val of_list : 'a list->'at * unit signal * unit future
of_list xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
val of_array : 'a array->'at * unit signal * unit future
of_array xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
of_seq xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
Subscriber interface
In order to start to monitor a stream, a user should subscribe to the stream using one of the subscription functions: watch, observe, subscribe.
The subscription can be canceled by using an unsubscribe function, or by throwing an exception from the callback function. The latter plays well with `with_return` function.
watch s f watches a stream s with a function f. A subscription identifier is passed to the function, so it can be used to unsubscribe from the stream directly from the function.
s' = map' s ~f apply function f for each value of a stream s and push values from a resulting queue into the stream s'. Example:
let q,p = of_list ['a','b','c', '.']
let q' = map q ~f:(function
| 'a'..'z' as c ->
Queue.of_list Char.[uppercase c; lowercase c]
| c -> Queue.singleton c
concat ss returns a stream that will produce elements from the input list of streams ss. The ordering of the elements of different streams is unspecified, though it is guaranteed that elements of the same stream will preserve their ordering.
concat_merge xs ~f builds a stream, that will produce elements from the input list and applies f to all consecutive elements. The ordering of the input list does not mandate the ordering of elements in the output stream, and is undefined. See concat for more information.
split xs ~f returns a pair of streams, where the first stream contains fst (f x) for each x in xs and the second stream contains snd (f x) for each x in xs.
unzip xs creates a pair of streams, where the first stream contains fst x for each x in xs and the second stream contains snd x for each x in xs. Essentially, the same as split ~f:ident
once xs creates a stream that will at most contain the next value produced by xs and nothing more.
val parse : 'at->init:'b->f:('b->'a->'c option * 'b)->'ct
parse ss ~init ~f parses stream ss and builds new stream ss'. Function f is applied to each consecutive element of the stream ss with a state s. If function f returns None,s', then no value is produced in the output state and state s' is passed to the next invocation of function f. If it returns Some x, s', then value x is produced by the output stream and state s' is passed to a consecutive invocation of f. If it state type 'b is an instance of a list type, then parse will be a push down automaton. With arbitrary type of state it is possible to build automatons that falls between PDA and Turing Machine (not including the latter).
val foldw : ?stride:int ->'at->int ->init:'b->f:('b->'a->'b)->'bt
foldw ss n ~init ~f performs a windowed fold of the stream. A function f is folded over n consecutive elements of ss, then the result is produced into the output stream, the window is shifted by stride (defaults to one) and function f applied to the next n elements. For example, if stream ss produced the following sequence of elements:
1,2,3,4,5,6,7,8
and windows length n is equal to 3, then the function f will be applied to a sequences:
Example, a moving average filter implemented with foldw:
let moving_average ss n =
Float.(foldw ss n ~init:zero ~f:(+) >>| fun s / of_int n)
val frame : clk:unit t->'at->init:'b->f:('b->'a->'b)->'bt
frame ~clk s ~init ~f will gather elements of s into frames, where the start of the new frame is signaled by a stream clk. The function is very similar to foldw except, that the window is determined dynamically by a clk stream. This function is useful to build custom time scales.
The semantics of the function can be clarified with the following description: 1. Every time a stream s produces a value it is buffered 2. Every time a stream clk produces a value, a function f is folded over all buffered value, and the result is put into the output stream. The internal buffer is cleared afterwards.
Example -------
Consider the following timing diagram, where each row represents a stream, and columns represent time. Elements of the clk stream are depicted with a T symbol.
clk: T T T T T T
ss: 123 56 123 12 1234 4 1234 1
will be framed in the following way:
[123], [5612312], [12344], [], [1234], [1]
Note: since all streams should be serialized it is impossible, that two events occur at the same time. So at the same column of the timing diagram there can be only one event.
hd s returns a future that will occur as soon, as stream s will produce a value. Note: if hd is called on a stream, that already produced some values, the returned future will still be fulfilled on the first value, that will be put into the stream after the future is created.
nth xs n returns n'th element of the stream xs. The element is n'th with respect to the future f, if was n'th element of the stream after the creation of the stream.
upon e xs returns a future that will be fulfilled with a last value of a stream xs before an event e has occurred. If at the time when the event e occurs, the stream xs didn't produce any elements, then the future will not be fulfilled.