Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file unpack_sequence.ml
open!Coreopen!Asyncopen!Importletinput_closed_error=Error.of_string"input closed"letinput_closed_in_the_middle_of_data_error=Error.of_string"input closed in the middle of data";;letunpack_errorerror=Error.create"unpack error"error[%sexp_of:Error.t]moduleUnpack_iter_result=structtype'at=|Input_closed|Input_closed_in_the_middle_of_dataof'aUnpack_buffer.t|Unpack_errorofError.t[@@derivingsexp_of]letto_error:_t->Error.t=function|Input_closed->input_closed_error|Input_closed_in_the_middle_of_data_->input_closed_in_the_middle_of_data_error|Unpack_errorerror->unpack_errorerror;;endmoduleUnpack_result=structtype'at=|Input_closed|Input_closed_in_the_middle_of_dataof'aUnpack_buffer.t|Output_closed|Unpack_errorofError.t[@@derivingsexp_of]letto_error:_t->Error.t=function|Input_closed->input_closed_error|Input_closed_in_the_middle_of_data_->input_closed_in_the_middle_of_data_error|Output_closed->Error.of_string"output closed"|Unpack_errorerror->unpack_errorerror;;leteofunpack_buffer=matchUnpack_buffer.is_emptyunpack_bufferwith|Errorerror->Unpack_errorerror|Oktrue->Input_closed|Okfalse->Input_closed_in_the_middle_of_dataunpack_buffer;;letof_unpack_iter_result:_Unpack_iter_result.t->_t=function|Input_closed->Input_closed|Input_closed_in_the_middle_of_datax->Input_closed_in_the_middle_of_datax|Unpack_errore->Unpack_errore;;endmoduleUnpack_from=structtypet=|PipeofstringPipe.Reader.t|ReaderofReader.tendmoduleUnpack_to=structtype'at=|Iterof('a->unit)|Pipeof'aPipe.Writer.t[@@derivingsexp_of]endletunpack_all~(from:Unpack_from.t)~(to_:_Unpack_to.t)~using:unpack_buffer=letunpack_all_available=matchto_with|Iterf->fun()->(matchUnpack_buffer.unpack_iterunpack_buffer~fwith|Ok()->return`Continue|Errorerror->return(`Stop(Unpack_result.Unpack_errorerror)))|Pipeoutput_writer->letfa=ifPipe.is_closedoutput_writerthen(* This will cause [unpack_iter] below to return [Error], and will result in
[Output_closed]. *)failwith"output closed";Pipe.write_without_pushbackoutput_writerainfun()->(matchUnpack_buffer.unpack_iterunpack_buffer~fwith|Ok()->Pipe.pushbackoutput_writer>>|fun()->`Continue|Errorerror->return(`Stop(ifPipe.is_closedoutput_writerthenUnpack_result.Output_closedelseUnpack_result.Unpack_errorerror)))inletfinished_with_input=matchfromwith|Readerinput->(* In rare situations, a reader can asynchronously raise. We'd rather not raise
here, since we have a natural place to report the error. *)try_with(fun()->Reader.read_one_chunk_at_a_timeinput~handle_chunk:(funbuf~pos~len->matchUnpack_buffer.feedunpack_bufferbuf~pos~lenwith|Errorerror->return(`Stop(Unpack_result.Unpack_errorerror))|Ok()->unpack_all_available()))>>|(function|Errorexn->Unpack_result.Unpack_error(Error.of_exnexn)|Ok(`Stoppedresult)->result|Ok`Eof->Unpack_result.eofunpack_buffer|Ok(`Eof_with_unconsumed_data_)->(* not possible since we always consume everithing *)assertfalse)|Pipeinput->Deferred.repeat_until_finished()(fun()->Pipe.read'input>>=function|`Eof->return(`Finished(Unpack_result.eofunpack_buffer))|`Okq->(matchQueue.iterq~f:(funstring->matchUnpack_buffer.feed_stringunpack_bufferstringwith|Ok()->()|Errorerror->Error.raiseerror)with|exceptionexn->return(`Finished(Unpack_result.Unpack_error(Error.of_exnexn)))|()->unpack_all_available()>>|(function|`Continue->`Repeat()|`Stopz->`Finishedz)))inmatchto_with|Iter_->finished_with_input|Pipeoutput->choose[choicefinished_with_inputFn.id;choice(Pipe.closedoutput)(fun()->Unpack_result.Output_closed)];;letunpack_into_pipe~from~using=letoutput_reader,output_writer=Pipe.create()inletresult=unpack_all~from~to_:(Pipeoutput_writer)~using>>|funresult->Pipe.closeoutput_writer;resultinoutput_reader,result;;letunpack_iter~from~using~f=unpack_all~from~to_:(Iterf)~using>>|function|Input_closed->Unpack_iter_result.Input_closed|Input_closed_in_the_middle_of_datax->Input_closed_in_the_middle_of_datax|Unpack_errorx->Unpack_errorx|Output_closedast->failwiths~here:[%here]"Unpack_sequence.unpack_iter got unexpected value"t[%sexp_of:_Unpack_result.t];;let%test_module_=(modulestructmoduleUnpack_result=structincludeUnpack_resultletcompare_compare_at1t2=matcht1,t2with|Input_closed,Input_closed->0|Input_closed_in_the_middle_of_data_,Input_closed_in_the_middle_of_data_->0|Output_closed,Output_closed->0|Unpack_errore_l,Unpack_errore_r->Error.comparee_le_r|((Input_closed|Input_closed_in_the_middle_of_data_|Output_closed|Unpack_error_),_)->-1;;endletpackbin_writervalues=List.mapvalues~f:(funvalue->Bin_prot.Utils.bin_dump~header:truebin_writervalue|>Bigstring.to_string)|>String.concat;;letbreak_into_piecesstring~of_size=letrecloopstart_idx=ifstart_idx<String.lengthstringthen(letnext_idx=Int.min(start_idx+of_size)(String.lengthstring)inletthis_slice=String.slicestringstart_idxnext_idxinthis_slice::loopnext_idx)else[]inloop0;;let%test_unit_=[%test_result:stringlist](break_into_pieces"foobarx"~of_size:2)~expect:["fo";"ob";"ar";"x"];;moduleValue=structtypet={a:string;b:int}[@@derivingbin_io,compare,sexp]letunpack_buffer()=Unpack_buffer.create_bin_protbin_reader_t(* Create a value unique to the seed. *)letcreateseed=letchar=Char.of_int_exn(seed+Char.to_int'a')in{a=String.makeseedchar;b=seed};;letpackts=packbin_writer_tts(* Bogus bin prot data that we know will *fail* when unpacked as a [Value.t]. *)letbogus_data=letbogus_size=10inletbuf=Bigstring.init(Bin_prot.Utils.size_header_length+bogus_size)~f:(const'\000')inignore(Bin_prot.Utils.bin_write_size_headerbuf~pos:0bogus_size:int);Bigstring.to_stringbuf;;let%test_unit_=letunpack_buffer=unpack_buffer()inok_exn(Unpack_buffer.feed_stringunpack_bufferbogus_data);letq=Queue.create()inmatchUnpack_buffer.unpack_intounpack_bufferqwith|Ok()->assertfalse|Error_->assert(Queue.is_emptyq);;(* A partial [Value.t] bin prot, which will cause [Unpack_buffer] to expect more data
when unpacked. *)letpartial_data=(* The size header should be more than 1 byte, so this is enough to make unpack
wait for more data. *)String.make1' ';;let%test_unit_=letunpack_buffer=unpack_buffer()inok_exn(Unpack_buffer.feed_stringunpack_bufferpartial_data);letq=Queue.create()inmatchUnpack_buffer.unpack_intounpack_bufferqwith|Ok()->assert(Queue.is_emptyq)|Error_->assertfalse;;endletvaluesn=List.initn~f:Value.createlettest_size=50let(>>=)deferredf=lettimeout=sec10.inClock.with_timeouttimeoutdeferred>>|(function|`Timeout->failwithf!"unpack_sequence.ml: Deferred took more than %{Time.Span}"timeout()|`Resultresult->result)>>=f;;let(>>|)deferredf=deferred>>=funx->return(fx)letsetup_string_pipe_reader()=letinput_r,input_w=Pipe.create()inletoutput,finished=unpack_into_pipe~from:(Pipeinput_r)~using:(Value.unpack_buffer())inreturn(input_w,output,finished);;letsetup_iter()=letinput_r,input_w=Pipe.create()inletoutput_r,output_w=Pipe.create()inletfinished=unpack_iter~from:(Pipeinput_r)~using:(Value.unpack_buffer())~f:(funa->Pipe.write_without_pushbackoutput_wa)>>|Unpack_result.of_unpack_iter_resultinreturn(input_w,output_r,finished);;letsetup_reader()=letpipe_info=Info.of_string"unpack sequence test"inletinput_r,input_w=Pipe.create()inReader.of_pipepipe_infoinput_r>>=funreader->letpipe,finished=unpack_into_pipe~from:(Readerreader)~using:(Unpack_buffer.create_bin_protValue.bin_reader_t)inreturn(input_w,pipe,finished);;letrun_tests?(only_supports_output_to_pipe=false)test_fn=Thread_safe.block_on_async_exn(fun()->Deferred.List.iter([setup_reader;setup_string_pipe_reader]@ifonly_supports_output_to_pipethen[]else[setup_iter])~f:(funsetup->setup()>>=fun(input,output,finished)->test_fninputoutputfinished));;let%test_unit"test various full reads"=run_tests(funinputoutputfinished->Deferred.repeat_until_finished(valuestest_size)(funvalues->matchvalueswith|[]->Pipe.closeinput;finished>>=funresult->[%test_result:Value.tUnpack_result.t]result~expect:Unpack_result.Input_closed;return(`Finished())|_::rest->letdata=Value.packvaluesinDeferred.repeat_until_finished1(funof_size->ifof_size>=String.lengthdatathenreturn(`Finished())else(letpieces=break_into_piecesdata~of_sizeinPipe.transfer_in_without_pushbackinput~from:(Queue.of_listpieces);Pipe.read_exactlyoutput~num_values:(List.lengthvalues)>>|function|`Eof|`Fewer_->assertfalse|`Exactlyqueue->[%test_result:Value.tlist](Queue.to_listqueue)~expect:values;`Repeat(of_size+1)))>>=fun()->return(`Repeatrest)));;let%test_unit"input closed in middle of read"=run_tests(funinputoutputfinished->letvalues=valuestest_sizeinletbuffer=Value.packvalues^Value.partial_datainPipe.write_without_pushbackinputbuffer;Pipe.read_exactlyoutput~num_values:(List.lengthvalues)>>=function|`Eof|`Fewer_->assertfalse|`Exactlyqueue->[%test_result:Value.tlist](Queue.to_listqueue)~expect:values;Pipe.closeinput;finished>>=funresult->[%test_result:Value.tUnpack_result.t]result~expect:(Input_closed_in_the_middle_of_data(Value.unpack_buffer()));Deferred.unit);;let%test_unit"output pipe closed"=(* This test relies on detecting that the output pipe has been closed. *)run_tests~only_supports_output_to_pipe:true(fun_inputoutputfinished->Pipe.close_readoutput;Pipe.read'output>>=function|`Ok_->assertfalse|`Eof->finished>>=funresult->[%test_result:Value.tUnpack_result.t]result~expect:Output_closed;Deferred.unit);;let%test_unit"bad bin-io data"=run_tests(funinputoutputfinished->letvalues=valuestest_sizeinletbuffer=Value.packvalues^Value.bogus_datainPipe.write_without_pushbackinputbuffer;Pipe.read_exactlyoutput~num_values:(List.lengthvalues)>>=function|`Eof|`Fewer_->assertfalse|`Exactlyqueue->[%test_result:Value.tlist](Queue.to_listqueue)~expect:values;finished>>|(function|Unpack_error_->()|_->assertfalse));;end);;