Source file state.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
let ( <.> ) f g x = f (g x)
type (+'a, 'err) t =
| Read of {
buffer : bytes;
off : int;
len : int;
k : [ `End | `Len of int ] -> ('a, 'err) t;
}
| Write of { buffer : string; off : int; len : int; k : int -> ('a, 'err) t }
| Return of 'a
| Error of 'err
let rec reword_error : type v a b. (a -> b) -> (v, a) t -> (v, b) t =
fun f -> function
| Error err -> Error (f err)
| Read { k; buffer; off; len } ->
Read { k = reword_error f <.> k; buffer; off; len }
| Write { k; buffer; off; len } ->
Write { k = reword_error f <.> k; buffer; off; len }
| Return _ as x -> x
let rec join : type a err. ((a, err) t, err) t -> (a, err) t = function
| Error _ as err -> err
| Read { k; buffer; off; len } -> Read { k = join <.> k; buffer; off; len }
| Write { k; buffer; off; len } -> Write { k = join <.> k; buffer; off; len }
| Return v -> v
let rec to_result : type a err. (a, err) t -> ((a, err) result, _) t = function
| Error err -> Return (Error err)
| Return v -> Return (Ok v)
| Read { k; buffer; off; len } ->
Read { k = to_result <.> k; buffer; off; len }
| Write { k; buffer; off; len } ->
Write { k = to_result <.> k; buffer; off; len }
module Context = struct
type t = { encoder : Encoder.encoder; decoder : Decoder.decoder }
type encoder = Encoder.encoder
type decoder = Decoder.decoder
let pp ppf t =
Fmt.pf ppf "{ @[<hov>encoder= @[<hov>%a@];@ decoder= @[<hov>%a@]@] }"
Encoder.pp t.encoder Decoder.pp t.decoder
let encoder_ex_nihilo () = Bytes.create Encoder.io_buffer_size
let decoder_ex_nihilo () = Bytes.create Decoder.io_buffer_size
let make ?(encoder = encoder_ex_nihilo) ?(decoder = decoder_ex_nihilo) () =
{
encoder = Encoder.encoder_from_preallocated_bytes (encoder ());
decoder = Decoder.decoder_from_preallocated_bytes (decoder ());
}
let encoder { encoder; _ } = encoder
let decoder { decoder; _ } = decoder
end
module type S = sig
type 'a send
type 'a recv
type error
type encoder
type decoder
val encode : encoder -> 'a send -> 'a -> (unit, error) t
val decode : decoder -> 'a recv -> ('a, error) t
end
module type C = sig
type t
type encoder
type decoder
val pp : t Fmt.t
val encoder : t -> encoder
val decoder : t -> decoder
end
module Scheduler
(Context : C)
(Value : S
with type encoder = Context.encoder
and type decoder = Context.decoder) =
struct
type error = Value.error
let rec go ~f m len =
match m len with
| Return v -> f v
| Read { k; off; len; buffer } -> Read { k = go ~f k; off; len; buffer }
| Write { k; off; len; buffer } ->
let k0 = function `End -> k 0 | `Len len -> k len in
let k1 = function 0 -> go ~f k0 `End | len -> go ~f k0 (`Len len) in
Write { k = k1; off; len; buffer }
| Error err -> Error err
let bind : ('a, 'err) t -> f:('a -> ('b, 'err) t) -> ('b, 'err) t =
fun m ~f ->
match m with
| Return v -> f v
| Error err -> Error err
| Read { k; off; len; buffer } -> Read { k = go ~f k; off; len; buffer }
| Write { k; off; len; buffer } ->
let k0 = function `End -> k 0 | `Len len -> k len in
let k1 = function 0 -> go ~f k0 `End | len -> go ~f k0 (`Len len) in
Write { k = k1; off; len; buffer }
let rec go ~f m len =
match m len with
| Return v -> f (Ok v)
| Read { k; off; len; buffer } -> Read { k = go ~f k; off; len; buffer }
| Write { k; off; len; buffer } ->
let k0 = function `End -> k 0 | `Len len -> k len in
let k1 = function 0 -> go ~f k0 `End | len -> go ~f k0 (`Len len) in
Write { k = k1; off; len; buffer }
| Error err -> f (Error err)
let bind' :
('a, 'err) t -> f:(('a, 'err) result -> ('b, 'err) t) -> ('b, 'err) t =
fun m ~f ->
match m with
| Return v -> f (Ok v)
| Error err -> f (Error err)
| Read { k; off; len; buffer } -> Read { k = go ~f k; off; len; buffer }
| Write { k; off; len; buffer } ->
let k0 = function `End -> k 0 | `Len len -> k len in
let k1 = function 0 -> go ~f k0 `End | len -> go ~f k0 (`Len len) in
Write { k = k1; off; len; buffer }
let ( let* ) m f = bind m ~f
let ( let+ ) m f = bind' m ~f
let ( >>= ) m f = bind m ~f
let encode :
type a.
Context.t ->
a Value.send ->
a ->
(Context.t -> ('b, [> `Protocol of error ]) t) ->
('b, [> `Protocol of error ]) t =
fun ctx w v k ->
let rec go = function
| Return () -> k ctx
| Write { k; buffer; off; len } ->
Write { k = go <.> k; buffer; off; len }
| Read { k; buffer; off; len } -> Read { k = go <.> k; buffer; off; len }
| Error err -> Error (`Protocol err) in
go (Value.encode (Context.encoder ctx) w v)
let send :
type a.
Context.t -> a Value.send -> a -> (unit, [> `Protocol of error ]) t =
fun ctx w x -> encode ctx w x (fun _ctx -> Return ())
let decode :
type a.
Context.t ->
a Value.recv ->
(Context.t -> a -> ('b, [> `Protocol of error ]) t) ->
('b, [> `Protocol of error ]) t =
fun ctx w k ->
let rec go : (a, 'err) t -> ('b, [> `Protocol of error ]) t = function
| Read { k; buffer; off; len } -> Read { k = go <.> k; buffer; off; len }
| Write { k; buffer; off; len } ->
Write { k = go <.> k; buffer; off; len }
| Return v -> k ctx v
| Error err -> Error (`Protocol err) in
go (Value.decode (Context.decoder ctx) w)
let recv : type a. Context.t -> a Value.recv -> (a, [> `Protocol of error ]) t
=
fun ctx w -> decode ctx w (fun _ctx v -> Return v)
let reword_error f x =
let rec go = function
| Read { k; buffer; off; len } -> Read { k = go <.> k; buffer; off; len }
| Write { k; buffer; off; len } ->
Write { k = go <.> k; buffer; off; len }
| Return v -> Return v
| Error err -> Error (f err) in
go x
let return v = Return v
let fail error = Error error
let error_msgf fmt = Fmt.kstr (fun err -> Error (`Msg err)) fmt
end