Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file rpc_transport_low_latency.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814openCoreopenImportmoduleKernel_transport=Rpc_kernel.TransportmoduleHeader=Kernel_transport.HeadermoduleHandler_result=Kernel_transport.Handler_resultmoduleSend_result=Kernel_transport.Send_resultexternalwritev2:Core.Unix.File_descr.t->buf1:Bigstring.t->pos1:int->len1:int->buf2:Bigstring.t->pos2:int->len2:int->Unix.Syscall_result.Int.t="async_extra_rpc_writev2_byte""async_extra_rpc_writev2"[@@noalloc]moduleConfig=struct(* Same as the default value of [buffer_age_limit] for [Async_unix.Writer] *)letdefault_write_timeout=Time_ns.Span.of_min2.(* No maximum *)letdefault_max_message_size=Int.max_valueletdefault_max_buffer_size=Int.max_value(* In general we'll send 1 message per job, if we send 2 there is a good chance we are
sending a batch.
Default should actually be 1, but there was a bug that made it 2 in practice, so we
keep 2 as a default. *)letdefault_start_batching_after_num_messages=2(* Arbitrary choices. *)letdefault_initial_buffer_size=64*1024letdefault_buffering_threshold_in_bytes=32*1024typet={max_message_size:int[@defaultdefault_max_message_size];initial_buffer_size:int[@defaultdefault_initial_buffer_size];max_buffer_size:int[@defaultdefault_max_buffer_size];write_timeout:Time_ns.Span.t[@defaultdefault_write_timeout];buffering_threshold_in_bytes:int[@defaultdefault_buffering_threshold_in_bytes];start_batching_after_num_messages:int[@defaultdefault_start_batching_after_num_messages]}[@@derivingsexp]letvalidatet=ift.initial_buffer_size<=0||t.max_message_size<=0||t.initial_buffer_size>t.max_buffer_size||t.max_message_size>t.max_buffer_size||t.buffering_threshold_in_bytes<0||t.start_batching_after_num_messages<0||Time_ns.Span.(<=)t.write_timeoutTime_ns.Span.zerothenfailwiths~here:[%here]"Rpc_transport_low_latency.Config.validate: invalid config"tsexp_of_t;t;;lett_of_sexpsexp=t_of_sexpsexp|>validateletcreate?(max_message_size=default_max_message_size)?(initial_buffer_size=default_initial_buffer_size)?(max_buffer_size=default_max_buffer_size)?(write_timeout=default_write_timeout)?(buffering_threshold_in_bytes=default_buffering_threshold_in_bytes)?(start_batching_after_num_messages=default_start_batching_after_num_messages)()=validate{max_message_size;initial_buffer_size;max_buffer_size;write_timeout;buffering_threshold_in_bytes;start_batching_after_num_messages};;letdefault=create()letmessage_size_okt~payload_len=payload_len>=0&&payload_len<=t.max_message_size;;letcheck_message_sizet~payload_len=ifnot(message_size_okt~payload_len)thenraise_s[%sexp"Rpc_transport_low_latency: message too small or too big",{message_size=(payload_len:int);config=(t:t)}];;letgrow_buffertbuf~new_size_request=assert(new_size_request>Bigstring.lengthbuf);ifnew_size_request>t.max_buffer_sizethenraise_s[%sexp"Rpc_transport_low_latency: cannot grow buffer",{new_size_request:int;config=(t:t)}];letlen=Int.mint.max_buffer_size(Int.ceil_pow2new_size_request)inBigstring.unsafe_destroy_and_resizebuf~len;;endletset_nonblockingfd=Fd.with_file_descr_exnfdignore~nonblocking:truemoduleReader_internal=structtypet={fd:Fd.t;config:Config.t;mutablereading:bool;mutableclosed:bool;close_finished:unitIvar.t;mutablebuf:(Bigstring.t[@sexp.opaque]);mutablepos:int(* Start of unconsumed data. *);mutablemax:int(* End of unconsumed data. *)}[@@derivingsexp_of,fields]letcreatefdconfig=set_nonblockingfd;{fd;config;reading=false;closed=false;close_finished=Ivar.create();buf=Bigstring.createconfig.initial_buffer_size;pos=0;max=0};;letis_closedt=t.closedletclose_finishedt=Ivar.readt.close_finished(* Shift remaining unconsumed data to the beginning of the buffer *)letshift_unconsumedt=ift.pos>0then(letlen=t.max-t.posiniflen>0thenBigstring.blit~src:t.buf~dst:t.buf~src_pos:t.pos~dst_pos:0~len;t.pos<-0;t.max<-len);;letrefillt=shift_unconsumedt;letresult=Bigstring_unix.read_assume_fd_is_nonblocking(Fd.file_descr_exnt.fd)t.buf~pos:t.max~len:(Bigstring.lengtht.buf-t.max)inifUnix.Syscall_result.Int.is_okresultthen(matchUnix.Syscall_result.Int.ok_exnresultwith|0->`Eof|n->assert(n>0);t.max<-t.max+n;`Read_some)else(matchUnix.Syscall_result.Int.error_exnresultwith|EAGAIN|EWOULDBLOCK|EINTR->`Nothing_available|EPIPE|ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH|ETIMEDOUT->`Eof|error->raise(Unix.Unix_error(error,"read","")));;(* To avoid allocating options in a relatively safe way. *)moduleMessage_len:sigtypet=privateintvalnone:tvalis_some:t->boolvalcreate_exn:int->t(* fails on negative ints *)valvalue_exn:t->intend=structtypet=intletnone=-1letis_somet=t>=0letcreate_exnn=ifn<0thenfailwithf"Message_len.create_exn of negative int: %d"n()elsen;;letvalue_exnt=ift<0thenfailwith"Message_len.value_exn of None"elsetend(* If one full message is available, returns its length (not including the
header). Returns [Message_len.none] otherwise. *)letget_payload_length_of_next_available_messaget=letpos=t.posinletavailable=t.max-posinifavailable>=Header.lengththen(letpayload_len=Header.unsafe_get_payload_lengtht.buf~posinlettotal_len=payload_len+Header.lengthinConfig.check_message_sizet.config~payload_len;iftotal_len<=availablethenMessage_len.create_exnpayload_lenelse(iftotal_len>Bigstring.lengtht.bufthent.buf<-Config.grow_buffert.configt.buf~new_size_request:total_len;Message_len.none))elseMessage_len.none;;moduleDispatcher=struct(* This module does a [Fd.every_ready_to] and takes care of exiting it when the
callback returns [Wait _]. *)type'astate=|Running|Stoppedof'astop_reasonand'astop_reason=|Handler_raised|Eof_reached(* Last handler call that wasn't determined immediately *)|Waiting_for_handlerofunitDeferred.t|Stopped_by_userof'atypenonrec'at={reader:t;on_message:Bigstring.t->pos:int->len:int->'aHandler_result.t;on_end_of_batch:unit->unit;interrupt:unitIvar.t(* To stop watching the file descriptor *);mutablestate:'astate}letis_runningt=matcht.statewith|Running->true|Stopped_->false;;letinterrupttreason=assert(is_runningt);t.state<-Stoppedreason;Ivar.fillt.interrupt();;letcan_process_messaget=(nott.reader.closed)&&is_runningtletrecprocess_received_messagest=ifcan_process_messagetthen(letlen=get_payload_length_of_next_available_messaget.readerinifMessage_len.is_somelenthen(letlen=Message_len.value_exnleninletstart=t.reader.pos+Header.lengthint.reader.pos<-start+len;matcht.on_messaget.reader.buf~pos:start~lenwith|Stopx->interruptt(Stopped_by_userx)|Continue->process_received_messagest|Waitd->ifDeferred.is_determineddthenprocess_received_messagestelseinterruptt(Waiting_for_handlerd))elset.on_end_of_batch());;letprocess_incomingt=ifcan_process_messagetthen(matchrefillt.readerwith|`Eof->interrupttEof_reached|`Nothing_available->()|`Read_some->process_received_messagest);;(* We want to stop reading/dispatching as soon as we get an error *)letstop_watching_on_errort~monitor=letparent=Monitor.current()inMonitor.detach_and_iter_errorsmonitor~f:(funexn->ifis_runningttheninterrupttHandler_raised;(* Let the monitor in effect when [dispatch] was called deal with the error. *)Monitor.send_exnparentexn);;letrecrunreader~on_message~on_end_of_batch=lett={reader;interrupt=Ivar.create();state=Running;on_message;on_end_of_batch}inletmonitor=Monitor.create~here:[%here]~name:"Rpc_transport_low_latency.Reader_internal.Dispatcher.run"()instop_watching_on_errort~monitor;Scheduler.within'~monitor(fun()->(* Process messages currently in the buffer. *)(* This will fill [t.interrupt] if [on_message] returns [Wait _]. However, we
expect [on_message] to almost never return [Wait _] with this transport, since
even the "non-copying" writes return [Deferred.unit]. *)process_received_messagest;letinterrupt=Deferred.any[Ivar.readt.interrupt;close_finishedt.reader]inFd.interruptible_every_ready_to~interruptt.reader.fd`Readprocess_incomingt)>>=function|`Bad_fd|`Unsupported->failwith"Rpc_transport_low_latency.Reader.read_forever: file descriptor doesn't \
support watching"|`Closed|`Interrupted->(matcht.statewith|Running->assert(Fd.is_closedt.reader.fd||t.reader.closed);return(Error`Closed)|Stopped(Stopped_by_userx)->return(Okx)|StoppedHandler_raised->(* The exception has been propagated, we only arrive here because we forced the
[every_ready_to] to be interrupted. *)Deferred.never()|StoppedEof_reached->return(Error`Eof)|Stopped(Waiting_for_handlerd)->d>>=fun()->ifreader.closedthenreturn(Error`Closed)elserunreader~on_message~on_end_of_batch);;endletread_forevert~on_message~on_end_of_batch=ift.closedthenfailwith"Rpc_transport_low_latency.Reader: reader closed";ift.readingthenfailwith"Rpc_transport_low_latency.Reader: already reading";t.reading<-true;Monitor.protect~here:[%here]~name:"Rpc_transport_low_latency.Reader_internal.read_forever"~finally:(fun()->t.reading<-false;Deferred.unit)(fun()->Dispatcher.runt~on_message~on_end_of_batch);;letcloset=ifnott.closedthen(t.closed<-true;Fd.closet.fd>>>fun()->Ivar.fillt.close_finished());close_finishedt;;endmoduleWriter_internal=structtypeflush={pos:Int63.t;ivar:unitIvar.t}[@@derivingsexp_of]letget_job_number()=Scheduler.num_jobs_run()moduleConnection_state:sigtypet[@@derivingsexp_of]valcreate:unit->tvalis_currently_accepting_writes:t->boolvalis_able_to_send_data:t->boolvalstart_close:t->unitvalfinish_close:t->fd_closed:unitDeferred.t->unitvalconnection_lost:t->unitvalclose_finished:t->unitDeferred.tvalstopped:t->unitDeferred.tend=structtypet={close_started:unitIvar.t;close_finished:unitIvar.t;connection_lost:unitIvar.t}[@@derivingsexp_of]letstart_closet=Ivar.fill_if_emptyt.close_started()letfinish_closet~fd_closed=start_closet;Ivar.fill_if_emptyt.connection_lost();uponfd_closed(Ivar.fill_if_emptyt.close_finished);;letclose_finishedt=Ivar.readt.close_finishedletis_currently_accepting_writest=Ivar.is_emptyt.close_startedletis_able_to_send_datat=Ivar.is_emptyt.connection_lostletconnection_lostt=Ivar.fill_if_emptyt.connection_lost()letstoppedt=Deferred.any[Ivar.readt.connection_lost;Ivar.readt.close_started];;letcreate()={close_started=Ivar.create();close_finished=Ivar.create();connection_lost=Ivar.create()};;endtypet={fd:Fd.t;config:Config.t;connection_state:Connection_state.t;mutablewriting:bool;mutablebuf:(Bigstring.t[@sexp.opaque]);mutablepos:int;mutablebytes_written:Int63.t;monitor:Monitor.t;flushes:flushQueue.t(* the job number of the job when we last sent data *);mutablelast_send_job:int;mutablesends_in_this_job:int}[@@derivingsexp_of,fields]letcreatefdconfig=set_nonblockingfd;{fd;config;writing=false;connection_state=Connection_state.create();buf=Bigstring.createconfig.initial_buffer_size;pos=0;bytes_written=Int63.zero;monitor=Monitor.create();flushes=Queue.create();last_send_job=0;sends_in_this_job=0};;letis_closedt=not(Connection_state.is_currently_accepting_writest.connection_state);;letclose_finishedt=Connection_state.close_finishedt.connection_stateletbytes_to_writet=t.posletstoppedt=Connection_state.stoppedt.connection_stateletflushedt=ift.pos=0thenDeferred.unitelseifnot(Connection_state.is_able_to_send_datat.connection_state)thenDeferred.never()else(letflush={pos=Int63.(+)t.bytes_written(Int63.of_intt.pos);ivar=Ivar.create()}inQueue.enqueuet.flushesflush;Ivar.readflush.ivar);;letready_to_write=flushedletdequeue_flushest=while(not(Queue.is_emptyt.flushes))&&Int63.(<=)(Queue.peek_exnt.flushes).post.bytes_writtendoIvar.fill(Queue.dequeue_exnt.flushes).ivar()done;;(* Discard the [n] first bytes of [t.buf] *)letdiscardtn=assert(n>=0&&n<=t.pos);letremaining=t.pos-ninifremaining>0thenBigstring.blit~src:t.buf~dst:t.buf~src_pos:n~dst_pos:0~len:remaining;t.pos<-remaining;t.bytes_written<-Int63.(+)t.bytes_written(Int63.of_intn);dequeue_flushest;;moduleError_kind=structtypet=|Write_blocked|Connection_lost|Other_errorendlethandle_errort(error:Unix.Error.t):Error_kind.t=matcherrorwith|EAGAIN|EWOULDBLOCK|EINTR->Write_blocked|EPIPE|ECONNRESET|EHOSTUNREACH|ENETDOWN|ENETRESET|ENETUNREACH|ETIMEDOUT->Connection_state.connection_lostt.connection_state;Connection_lost|_->Other_error;;moduleSingle_write_result=structtypet=|Continue|Stopendletsingle_writet:Single_write_result.t=matchBigstring_unix.write_assume_fd_is_nonblocking(Fd.file_descr_exnt.fd)t.buf~pos:0~len:t.poswith|n->discardtn;Continue|exception(Unix.Unix_error(error,_,_)asexn)->(matchhandle_errorterrorwith|Write_blocked->Continue|Connection_lost->Stop|Other_error->raiseexn);;letfinish_closet=letfd_closed=Fd.closet.fdint.writing<-false;Connection_state.finish_closet.connection_state~fd_closed;;letrecwrite_everythingt=matchsingle_writetwith|Stop->finish_closet|Continue->ift.pos=0then(t.writing<-false;ifis_closedtthenfinish_closet)elsewait_and_write_everythingtandwait_and_write_everythingt=Clock_ns.with_timeoutt.config.write_timeout(Fd.ready_tot.fd`Write)>>>funresult->ifnot(Connection_state.is_able_to_send_datat.connection_state)thenfinish_closetelse(matchresultwith|`Result`Ready->write_everythingt|`Timeout->Log.Global.sexp~level:`Error[%message"Rpc_transport_low_latency.Writer timed out waiting to write on file \
descriptor. Closing the writer."~timeout:(t.config.write_timeout:Time_ns.Span.t)(t:t)];finish_closet|`Result((`Bad_fd|`Closed)asresult)->raise_s[%sexp"Rpc_transport_low_latency.Writer: fd changed",{t:t;ready_to_result=(result:[`Bad_fd|`Closed])}]);;letflusht=if(nott.writing)&&t.pos>0then(t.writing<-true;Scheduler.within~monitor:t.monitor(fun()->write_everythingt));;letschedule_flusht=if(nott.writing)&&t.pos>0then(t.writing<-true;Scheduler.within~monitor:t.monitor(fun()->wait_and_write_everythingt));;letensure_at_leastt~needed=ifBigstring.lengtht.buf-t.pos<neededthen(letnew_size_request=t.pos+neededint.buf<-Config.grow_buffert.configt.buf~new_size_request);;letcopy_bytest~buf~pos~len=iflen>0then(ensure_at_leastt~needed:len;Bigstring.blit~src:buf~dst:t.buf~src_pos:pos~dst_pos:t.pos~len;t.pos<-t.pos+len);;(* Write what's in the internal buffer + bytes denoted by [(buf, pos, len)] *)letunsafe_send_bytest~buf~pos~len=letresult=writev2(Fd.file_descr_exnt.fd)~buf1:t.buf~pos1:0~len1:t.pos~buf2:buf~pos2:pos~len2:leninifUnix.Syscall_result.Int.is_okresultthen(letn=Unix.Syscall_result.Int.ok_exnresultinifn<=t.posthen((* We wrote less than what's in the internal buffer, discard what was written and
copy in the other buffer. *)discardtn;copy_bytest~buf~pos~len)else(letwritten_from_other_buf=n-t.posinletremaining_in_other_buf=len-written_from_other_bufindiscardtt.pos;ifremaining_in_other_buf>0thencopy_bytest~buf~pos:(pos+written_from_other_buf)~len:remaining_in_other_buf))else(leterror=Unix.Syscall_result.Int.error_exnresultinmatchhandle_errorterrorwith|Write_blocked->copy_bytest~buf~pos~len|Connection_lost->()|Other_error->letsyscall=iflen=0then"write"else"writev"inMonitor.send_exnt.monitor(Unix.Unix_error(error,syscall,"")));;letslow_write_bin_prot_and_bigstringt(writer:_Bin_prot.Type_class.writer)msg~buf~pos~len:_Send_result.t=letpayload_len=writer.sizemsg+leninlettotal_len=Header.length+payload_leninifConfig.message_size_okt.config~payload_lenthen(ensure_at_leastt~needed:total_len;Header.unsafe_set_payload_lengtht.buf~pos:t.pospayload_len;letstop=writer.writet.buf~pos:(t.pos+Header.length)msginassert(stop+len=t.pos+total_len);Bigstring.blit~src:buf~dst:t.buf~src_pos:pos~dst_pos:stop~len;t.pos<-stop+len;Sent())elseMessage_too_big{size=payload_len;max_message_size=t.config.max_message_size};;letshould_send_nowt=letcurrent_job=get_job_number()inifcurrent_job=t.last_send_jobthent.sends_in_this_job<-t.sends_in_this_job+1else(t.last_send_job<-current_job;t.sends_in_this_job<-1);t.pos>=t.config.buffering_threshold_in_bytes||t.sends_in_this_job<=t.config.start_batching_after_num_messages;;letsend_bin_prot_and_bigstringt(writer:_Bin_prot.Type_class.writer)msg~buf~pos~len:_Send_result.t=ifis_closedtthenClosedelse(Ordered_collection_common.check_pos_len_exn~pos~len~total_length:(Bigstring.lengthbuf);ifConnection_state.is_able_to_send_datat.connection_statethen(letsend_now=should_send_nowtinletresult=ifBigstring.lengtht.buf-t.pos<Header.lengththenslow_write_bin_prot_and_bigstringtwritermsg~buf~pos~lenelse(matchwriter.writet.buf~pos:(t.pos+Header.length)msgwith|exception_->(* It's likely that the exception is due to a buffer overflow, so resize the
internal buffer and try again. Technically we could match on
[Bin_prot.Common.Buffer_short] only, however we can't easily enforce that
custom bin_write_xxx functions raise this particular exception and not
[Invalid_argument] or [Failure] for instance. *)slow_write_bin_prot_and_bigstringtwritermsg~buf~pos~len|stop->letpayload_len=stop-(t.pos+Header.length)+leninifConfig.message_size_okt.config~payload_lenthen(Header.unsafe_set_payload_lengtht.buf~pos:t.pospayload_len;t.pos<-stop;ifsend_nowthen(letlen=iflen<128then(copy_bytest~buf~pos~len;0)elseleninunsafe_send_bytest~buf~pos~len)elsecopy_bytest~buf~pos~len;Sent())elseMessage_too_big{size=payload_len;max_message_size=t.config.max_message_size})inifsend_nowthenflushtelseschedule_flusht;result)elseSent());;letsent_deferred_unit=Send_result.SentDeferred.unitletsend_bin_prot_and_bigstring_non_copyingtwritermsg~buf~pos~len=matchsend_bin_prot_and_bigstringtwritermsg~buf~pos~lenwith|Sent()->sent_deferred_unit|(Closed|Message_too_big_)asr->r;;letdummy_buf=Bigstring.create0letsend_bin_prottwritermsg=send_bin_prot_and_bigstringtwritermsg~buf:dummy_buf~pos:0~len:0;;letcloset=ifnot(is_closedt)then(Connection_state.start_closet.connection_state;flusht;ifnott.writingthenfinish_closet);close_finishedt;;endletmake_createf?(config=Config.default)~max_message_sizefd=letmax_message_size=minconfig.max_message_sizemax_message_sizeinletconfig=Config.validate{configwithmax_message_size}inffdconfig;;moduleReader=structincludeKernel_transport.Readerletcreate_internalfdconfig=pack(moduleReader_internal)(Reader_internal.createfdconfig);;letcreate=make_createcreate_internalendmoduleWriter=structincludeKernel_transport.Writerletcreate_internalfdconfig=pack(moduleWriter_internal)(Writer_internal.createfdconfig);;letcreate=make_createcreate_internalendtypet=Kernel_transport.t={reader:Reader.t;writer:Writer.t}[@@derivingsexp_of]letclose=Kernel_transport.closeletcreate_internalfdconfig={reader=Reader.create_internalfdconfig;writer=Writer.create_internalfdconfig};;letcreate=make_createcreate_internal