Source file unpack_sequence.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
open! Core
open! Async
open! Import
let input_closed_error = Error.of_string "input closed"
let input_closed_in_the_middle_of_data_error =
Error.of_string "input closed in the middle of data"
;;
let unpack_error error = Error.create "unpack error" error [%sexp_of: Error.t]
module Unpack_iter_result = struct
type 'a t =
| Input_closed
| Input_closed_in_the_middle_of_data of 'a Unpack_buffer.t
| Unpack_error of Error.t
[@@deriving sexp_of]
let to_error : _ t -> Error.t = function
| Input_closed -> input_closed_error
| Input_closed_in_the_middle_of_data _ -> input_closed_in_the_middle_of_data_error
| Unpack_error error -> unpack_error error
;;
end
module Unpack_result = struct
type 'a t =
| Input_closed
| Input_closed_in_the_middle_of_data of 'a Unpack_buffer.t
| Output_closed
| Unpack_error of Error.t
[@@deriving sexp_of]
let to_error : _ t -> Error.t = function
| Input_closed -> input_closed_error
| Input_closed_in_the_middle_of_data _ -> input_closed_in_the_middle_of_data_error
| Output_closed -> Error.of_string "output closed"
| Unpack_error error -> unpack_error error
;;
let eof unpack_buffer =
match Unpack_buffer.is_empty unpack_buffer with
| Error error -> Unpack_error error
| Ok true -> Input_closed
| Ok false -> Input_closed_in_the_middle_of_data unpack_buffer
;;
let of_unpack_iter_result : _ Unpack_iter_result.t -> _ t = function
| Input_closed -> Input_closed
| Input_closed_in_the_middle_of_data x -> Input_closed_in_the_middle_of_data x
| Unpack_error e -> Unpack_error e
;;
end
module Unpack_from = struct
type t =
| Pipe of string Pipe.Reader.t
| Reader of Reader.t
end
module Unpack_to = struct
type 'a t =
| Iter of ('a -> unit)
| Pipe of 'a Pipe.Writer.t
[@@deriving sexp_of]
end
let unpack_all ~(from : Unpack_from.t) ~(to_ : _ Unpack_to.t) ~using:unpack_buffer =
let unpack_all_available =
match to_ with
| Iter f ->
fun () ->
(match Unpack_buffer.unpack_iter unpack_buffer ~f with
| Ok () -> return `Continue
| Error error -> return (`Stop (Unpack_result.Unpack_error error)))
| Pipe output_writer ->
let f a =
if Pipe.is_closed output_writer
then
failwith "output closed";
Pipe.write_without_pushback output_writer a
in
fun () ->
(match Unpack_buffer.unpack_iter unpack_buffer ~f with
| Ok () -> Pipe.pushback output_writer >>| fun () -> `Continue
| Error error ->
return
(`Stop
(if Pipe.is_closed output_writer
then Unpack_result.Output_closed
else Unpack_result.Unpack_error error)))
in
let finished_with_input =
match from with
| Reader input ->
try_with (fun () ->
Reader.read_one_chunk_at_a_time input ~handle_chunk:(fun buf ~pos ~len ->
match Unpack_buffer.feed unpack_buffer buf ~pos ~len with
| Error error -> return (`Stop (Unpack_result.Unpack_error error))
| Ok () -> unpack_all_available ()))
>>| (function
| Error exn -> Unpack_result.Unpack_error (Error.of_exn exn)
| Ok (`Stopped result) -> result
| Ok `Eof -> Unpack_result.eof unpack_buffer
| Ok (`Eof_with_unconsumed_data _) ->
assert false)
| Pipe input ->
Deferred.repeat_until_finished () (fun () ->
Pipe.read' input
>>= function
| `Eof -> return (`Finished (Unpack_result.eof unpack_buffer))
| `Ok q ->
(match
Queue.iter q ~f:(fun string ->
match Unpack_buffer.feed_string unpack_buffer string with
| Ok () -> ()
| Error error -> Error.raise error)
with
| exception exn ->
return (`Finished (Unpack_result.Unpack_error (Error.of_exn exn)))
| () ->
unpack_all_available ()
>>| (function
| `Continue -> `Repeat ()
| `Stop z -> `Finished z)))
in
match to_ with
| Iter _ -> finished_with_input
| Pipe output ->
choose
[ choice finished_with_input Fn.id
; choice (Pipe.closed output) (fun () -> Unpack_result.Output_closed)
]
;;
let unpack_into_pipe ~from ~using =
let output_reader, output_writer = Pipe.create () in
let result =
unpack_all ~from ~to_:(Pipe output_writer) ~using
>>| fun result ->
Pipe.close output_writer;
result
in
output_reader, result
;;
let unpack_iter ~from ~using ~f =
unpack_all ~from ~to_:(Iter f) ~using
>>| function
| Input_closed -> Unpack_iter_result.Input_closed
| Input_closed_in_the_middle_of_data x -> Input_closed_in_the_middle_of_data x
| Unpack_error x -> Unpack_error x
| Output_closed as t ->
failwiths
~here:[%here]
"Unpack_sequence.unpack_iter got unexpected value"
t
[%sexp_of: _ Unpack_result.t]
;;
let%test_module _ =
(module struct
module Unpack_result = struct
include Unpack_result
let compare _compare_a t1 t2 =
match t1, t2 with
| Input_closed, Input_closed -> 0
| Input_closed_in_the_middle_of_data _, Input_closed_in_the_middle_of_data _ -> 0
| Output_closed, Output_closed -> 0
| Unpack_error e_l, Unpack_error e_r -> Error.compare e_l e_r
| ( ( Input_closed
| Input_closed_in_the_middle_of_data _
| Output_closed
| Unpack_error _ )
, _ ) -> -1
;;
end
let pack bin_writer values =
List.map values ~f:(fun value ->
Bin_prot.Utils.bin_dump ~header:true bin_writer value |> Bigstring.to_string)
|> String.concat
;;
let break_into_pieces string ~of_size =
let rec loop start_idx =
if start_idx < String.length string
then (
let next_idx = Int.min (start_idx + of_size) (String.length string) in
let this_slice = String.slice string start_idx next_idx in
this_slice :: loop next_idx)
else []
in
loop 0
;;
let%test_unit _ =
[%test_result: string list]
(break_into_pieces "foobarx" ~of_size:2)
~expect:[ "fo"; "ob"; "ar"; "x" ]
;;
module Value = struct
: int
}
[@@deriving bin_io, compare, sexp]
let unpack_buffer () = Unpack_buffer.create_bin_prot bin_reader_t
let create seed =
let char = Char.of_int_exn (seed + Char.to_int 'a') in
{ a = String.make seed char; b = seed }
;;
let pack ts = pack bin_writer_t ts
let bogus_data =
let bogus_size = 10 in
let buf =
Bigstring.init
(Bin_prot.Utils.size_header_length + bogus_size)
~f:(const '\000')
in
ignore (Bin_prot.Utils.bin_write_size_header buf ~pos:0 bogus_size : int);
Bigstring.to_string buf
;;
let%test_unit _ =
let unpack_buffer = unpack_buffer () in
ok_exn (Unpack_buffer.feed_string unpack_buffer bogus_data);
let q = Queue.create () in
match Unpack_buffer.unpack_into unpack_buffer q with
| Ok () -> assert false
| Error _ -> assert (Queue.is_empty q)
;;
let partial_data =
String.make 1 ' '
;;
let%test_unit _ =
let unpack_buffer = unpack_buffer () in
ok_exn (Unpack_buffer.feed_string unpack_buffer partial_data);
let q = Queue.create () in
match Unpack_buffer.unpack_into unpack_buffer q with
| Ok () -> assert (Queue.is_empty q)
| Error _ -> assert false
;;
end
let values n = List.init n ~f:Value.create
let test_size = 50
let ( >>= ) deferred f =
let timeout = sec 10. in
Clock.with_timeout timeout deferred
>>| (function
| `Timeout ->
failwithf
!"unpack_sequence.ml: Deferred took more than %{Time.Span}"
timeout
()
| `Result result -> result)
>>= f
;;
let ( >>| ) deferred f = deferred >>= fun x -> return (f x)
let setup_string_pipe_reader () =
let input_r, input_w = Pipe.create () in
let output, finished =
unpack_into_pipe ~from:(Pipe input_r) ~using:(Value.unpack_buffer ())
in
return (input_w, output, finished)
;;
let setup_iter () =
let input_r, input_w = Pipe.create () in
let output_r, output_w = Pipe.create () in
let finished =
unpack_iter ~from:(Pipe input_r) ~using:(Value.unpack_buffer ()) ~f:(fun a ->
Pipe.write_without_pushback output_w a)
>>| Unpack_result.of_unpack_iter_result
in
return (input_w, output_r, finished)
;;
let setup_reader () =
let pipe_info = Info.of_string "unpack sequence test" in
let input_r, input_w = Pipe.create () in
Reader.of_pipe pipe_info input_r
>>= fun reader ->
let pipe, finished =
unpack_into_pipe
~from:(Reader reader)
~using:(Unpack_buffer.create_bin_prot Value.bin_reader_t)
in
return (input_w, pipe, finished)
;;
let run_tests ?(only_supports_output_to_pipe = false) test_fn =
Thread_safe.block_on_async_exn (fun () ->
Deferred.List.iter
([ setup_reader; setup_string_pipe_reader ]
@ if only_supports_output_to_pipe then [] else [ setup_iter ])
~f:(fun setup ->
setup () >>= fun (input, output, finished) -> test_fn input output finished))
;;
let%test_unit "test various full reads" =
run_tests (fun input output finished ->
Deferred.repeat_until_finished (values test_size) (fun values ->
match values with
| [] ->
Pipe.close input;
finished
>>= fun result ->
[%test_result: Value.t Unpack_result.t]
result
~expect:Unpack_result.Input_closed;
return (`Finished ())
| _ :: rest ->
let data = Value.pack values in
Deferred.repeat_until_finished 1 (fun of_size ->
if of_size >= String.length data
then return (`Finished ())
else (
let pieces = break_into_pieces data ~of_size in
Pipe.transfer_in_without_pushback
input
~from:(Queue.of_list pieces);
Pipe.read_exactly output ~num_values:(List.length values)
>>| function
| `Eof | `Fewer _ -> assert false
| `Exactly queue ->
[%test_result: Value.t list] (Queue.to_list queue) ~expect:values;
`Repeat (of_size + 1)))
>>= fun () -> return (`Repeat rest)))
;;
let%test_unit "input closed in middle of read" =
run_tests (fun input output finished ->
let values = values test_size in
let buffer = Value.pack values ^ Value.partial_data in
Pipe.write_without_pushback input buffer;
Pipe.read_exactly output ~num_values:(List.length values)
>>= function
| `Eof | `Fewer _ -> assert false
| `Exactly queue ->
[%test_result: Value.t list] (Queue.to_list queue) ~expect:values;
Pipe.close input;
finished
>>= fun result ->
[%test_result: Value.t Unpack_result.t]
result
~expect:(Input_closed_in_the_middle_of_data (Value.unpack_buffer ()));
Deferred.unit)
;;
let%test_unit "output pipe closed" =
run_tests ~only_supports_output_to_pipe:true (fun _input output finished ->
Pipe.close_read output;
Pipe.read' output
>>= function
| `Ok _ -> assert false
| `Eof ->
finished
>>= fun result ->
[%test_result: Value.t Unpack_result.t] result ~expect:Output_closed;
Deferred.unit)
;;
let%test_unit "bad bin-io data" =
run_tests (fun input output finished ->
let values = values test_size in
let buffer = Value.pack values ^ Value.bogus_data in
Pipe.write_without_pushback input buffer;
Pipe.read_exactly output ~num_values:(List.length values)
>>= function
| `Eof | `Fewer _ -> assert false
| `Exactly queue ->
[%test_result: Value.t list] (Queue.to_list queue) ~expect:values;
finished
>>| (function
| Unpack_error _ -> ()
| _ -> assert false))
;;
end)
;;