Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file fd.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323openCoreopenImportmoduleFile_descr=Unix.File_descrmoduleScheduler=Raw_schedulermoduleFd=Raw_fdincludeFd.TopenFdletdebug=debugletis_closed=is_closedletis_open=is_openletsyscall=syscallletsyscall_exn=syscall_exnletsyscall_result_exn=syscall_result_exnletwith_file_descr=with_file_descrletwith_file_descr_exn=with_file_descr_exnmoduleKind=structincludeFd.Kindletblocking_infer_using_statfile_descr=letst=Unix.fstatfile_descrinmatchst.st_kindwith|S_REG|S_DIR|S_BLK|S_LNK->File|S_CHR->Char|S_FIFO->Fifo|S_SOCK->Socket(ifUnix.getsockoptfile_descrSO_ACCEPTCONNthen`Passiveelse`Active);;letinfer_using_statfile_descr=In_thread.syscall_exn~name:"fstat"(fun()->blocking_infer_using_statfile_descr);;endletto_stringt=Sexp.to_string_hum(sexp_of_tt)letthe_one_and_only()=Scheduler.the_one_and_only~should_lock:trueletcreate?avoid_nonblock_if_possiblekindfile_descrinfo=Scheduler.create_fd?avoid_nonblock_if_possible(the_one_and_only())kindfile_descrinfo;;(* If possible, we try not to treat [stdin], [stdout], or [stderr] as nonblocking so that
one can use Core I/O libraries simultaneously with async without them failing due to
[Sys_blocked_io]. *)letcreate_std_descrfile_descrinfo=create(Kind.blocking_infer_using_statfile_descr)file_descrinfo~avoid_nonblock_if_possible:true;;letstdin=Memo.unit(fun()->create_std_descrUnix.stdin(Info.of_string"<stdin>"))letstdout=Memo.unit(fun()->create_std_descrUnix.stdout(Info.of_string"<stdout>"));;letstderr=Memo.unit(fun()->create_std_descrUnix.stderr(Info.of_string"<stderr>"));;letclear_nonblockt=ift.supports_nonblockthen(t.supports_nonblock<-false;ift.have_set_nonblockthen(t.have_set_nonblock<-false;Unix.clear_nonblockt.file_descr));;moduleClose=structtypesocket_handling=|Shutdown_socket|Do_not_shutdown_sockettypefile_descriptor_handling=|Close_file_descriptorofsocket_handling|Do_not_close_file_descriptorletclose?(file_descriptor_handling=Close_file_descriptorShutdown_socket)t=ifdebugthenDebug.log"Fd.close"t[%sexp_of:t];(matcht.statewith|Close_requested_|Closed->()|Openclose_started->Ivar.fillclose_started();letdo_close_syscall()=don't_wait_for(let%map()=matchfile_descriptor_handlingwith|Do_not_close_file_descriptor->return()|Close_file_descriptorsocket_handling->Monitor.protect~finally:(fun()->In_thread.syscall_exn~name:"close"(fun()->Unix.closet.file_descr))(fun()->matcht.kind,socket_handlingwith|Socket`Active,Shutdown_socket->In_thread.syscall_exn~name:"shutdown"(fun()->Unix.shutdownt.file_descr~mode:SHUTDOWN_ALL)|_->return())inIvar.fillt.close_finished())inletscheduler=the_one_and_only()inletkernel_scheduler=scheduler.kernel_schedulerinset_statet(Close_requested(Kernel_scheduler.current_execution_contextkernel_scheduler,do_close_syscall));(* Notify other users of this fd that it is going to be closed. *)Scheduler.request_stop_watchingschedulert`Read`Closed;Scheduler.request_stop_watchingschedulert`Write`Closed;(* If there are no syscalls in progress, then start closing the fd. *)Scheduler.maybe_start_closing_fdschedulert);Ivar.readt.close_finished;;endincludeCloseletclose_finishedt=Ivar.readt.close_finishedletclose_startedt=matcht.statewith|Openclose_started->Ivar.readclose_started|Close_requested_|Closed->return();;letwith_closet~f=Monitor.protect(fun()->ft)~finally:(fun()->closet)letwith_file_descr_deferredtf=matchinc_num_active_syscallstwith|`Already_closed->return`Already_closed|`Ok->let%mapresult=Monitor.try_with(fun()->ft.file_descr)inScheduler.dec_num_active_syscalls_fd(the_one_and_only())t;(matchresultwith|Okx->`Okx|Errore->`Errore);;letwith_file_descr_deferred_exntf=match%mapwith_file_descr_deferredtfwith|`Okx->x|`Errorexn->raiseexn|`Already_closed->raise_s[%message"Fd.with_file_descr_deferred_exn got closed fd"~_:(t:t_hum)];;letstart_watchingtread_or_writewatching=ifdebugthenDebug.log"Fd.start_watching"(t,read_or_write)[%sexp_of:t*Read_write.Key.t];letr=the_one_and_only()inmatchScheduler.request_start_watchingrtread_or_writewatchingwith|(`Unsupported|`Already_closed|`Watching)asx->x|`Already_watching->raise_s[%message"cannot watch an fd already being watched"~fd:(t:t)~scheduler:(r:Scheduler.t)];;letstop_watching_upon_interrupttread_or_writeivar~interrupt=upon(choose[choiceinterrupt(fun()->`Interrupted);choice(Ivar.readivar)(fun_->`Not_interrupted)])(function|`Not_interrupted->()|`Interrupted->ifIvar.is_emptyivarthenScheduler.request_stop_watching(the_one_and_only())tread_or_write`Interrupted);;letinterruptible_ready_totread_or_write~interrupt=ifdebugthenDebug.log"Fd.interruptible_ready_to"(t,read_or_write)[%sexp_of:t*Read_write.Key.t];letready=Ivar.create()inmatchstart_watchingtread_or_write(Watch_onceready)with|`Already_closed->return`Closed|`Unsupported->return(ifDeferred.is_determinedinterruptthen`Interruptedelse`Ready)|`Watching->stop_watching_upon_interrupttread_or_writeready~interrupt;Ivar.readready;;letready_totread_or_write=ifdebugthenDebug.log"Fd.ready_to"(t,read_or_write)[%sexp_of:t*Read_write.Key.t];letready=Ivar.create()inmatchstart_watchingtread_or_write(Watch_onceready)with|`Already_closed->return`Closed|`Unsupported->return`Ready|`Watching->(match%mapIvar.readreadywith|(`Bad_fd|`Closed|`Ready)asx->x|`Interrupted->(* impossible *)assertfalse);;letinterruptible_every_ready_totread_or_write~interruptfx=ifdebugthenDebug.log"Fd.interruptible_every_ready_to"(t,read_or_write)[%sexp_of:t*Read_write.Key.t];letjob=Scheduler.(create_job(t()))fxinletfinished=Ivar.create()inmatchstart_watchingtread_or_write(Watch_repeatedly(job,finished))with|`Already_closed->return`Closed|`Unsupported->return`Unsupported|`Watching->stop_watching_upon_interrupttread_or_writefinished~interrupt;(Ivar.readfinished:>[`Bad_fd|`Closed|`Unsupported|`Interrupted]Deferred.t);;letevery_ready_totread_or_writefx=ifdebugthenDebug.log"Fd.every_ready_to"(t,read_or_write)[%sexp_of:t*Read_write.Key.t];letjob=Scheduler.(create_job(t()))fxinletfinished=Ivar.create()inmatchstart_watchingtread_or_write(Watch_repeatedly(job,finished))with|`Already_closed->return`Closed|`Unsupported->return`Unsupported|`Watching->(match%mapIvar.readfinishedwith|(`Bad_fd|`Closed)asx->x|`Interrupted->(* impossible *)assertfalse);;letsyscall_in_threadt~namef=match%mapwith_file_descr_deferredt(funfile_descr->In_thread.syscall~name(fun()->ffile_descr))with|`Errore->(* [In_thread.syscall] catches any exceptions [f] can raise, so this can only be
reached when there's something wrong with the [In_thread] machinery itself. *)raise_s[%message"Fd.syscall_in_thread problem -- please report this"name~_:(e:exn)]|`Already_closed->`Already_closed|`Okx->(matchxwith|Okx->`Okx|Errorexn->`Errorexn);;letsyscall_in_thread_exnt~namef=match%mapsyscall_in_threadt~namefwith|`Okx->x|`Errorexn->raiseexn|`Already_closed->raise_s[%message"Fd.syscall_in_thread_exn of a closed fd"name~_:(t:t_hum)];;letof_in_channelickind=createkind(Unix.descr_of_in_channelic)(Info.of_string"<of_in_channel>");;letof_out_channelockind=createkind(Unix.descr_of_out_channeloc)(Info.of_string"<of_out_channel>");;letof_in_channel_autoic=Kind.infer_using_stat(Unix.descr_of_in_channelic)>>|of_in_channelic;;letof_out_channel_autooc=Kind.infer_using_stat(Unix.descr_of_out_channeloc)>>|of_out_channeloc;;letfile_descr_exnt=ifis_closedtthenraise_s[%message"Fd.file_descr_exn on already closed fd"~_:(t:t)]elset.file_descr;;letto_int_exnt=File_descr.to_int(file_descr_exnt)modulePrivate=structletreplacetkindinfo=ifis_closedtthenraise_s[%message"Fd.replace got closed fd"~fd:(t:t)(kind:Kind.t)~scheduler:(the_one_and_only():Scheduler.t)]else(t.kind<-kind;t.info<-(matchinfowith|`Seti->i|`Extendi->Info.create"replaced"(i,`previously_wast.info)[%sexp_of:Info.t*[`previously_wasofInfo.t]]));;end