Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file lwt_process.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)openLwt.Infixtypecommand=string*stringarrayletshell=ifSys.win32thenfuncmd->("",[|"cmd.exe";"/c";"\000"^cmd|])elsefuncmd->("",[|"/bin/sh";"-c";cmd|])typeredirection=[`Keep|`Dev_null|`Close|`FD_copyofUnix.file_descr|`FD_moveofUnix.file_descr](* +-----------------------------------------------------------------+
| OS-depentent command spawning |
+-----------------------------------------------------------------+ *)typeproc={id:int;(* The process id. *)fd:Unix.file_descr;(* A handle on windows, and a dummy value of Unix. *)}letwin32_get_fdfdredirection=matchredirectionwith|`Keep->Somefd|`Dev_null->Some(Unix.openfile"nul"[Unix.O_RDWR]0o666)|`Close->None|`FD_copyfd'->Somefd'|`FD_movefd'->Somefd'externalwin32_create_process:stringoption->string->stringoption->(Unix.file_descroption*Unix.file_descroption*Unix.file_descroption)->proc="lwt_process_create_process"letwin32_quotearg=ifString.lengtharg>0&&arg.[0]='\000'thenString.subarg1(String.lengtharg-1)elseFilename.quoteargletwin32_spawn(prog,args)env?(stdin:redirection=`Keep)?(stdout:redirection=`Keep)?(stderr:redirection=`Keep)toclose=letcmdline=String.concat" "(List.mapwin32_quote(Array.to_listargs))inletenv=matchenvwith|None->None|Someenv->letlen=Array.fold_left(funlenstr->String.lengthstr+len+1)1envinletres=Bytes.createleninletofs=Array.fold_left(funofsstr->letlen=String.lengthstrinString.blitstr0resofslen;Bytes.setres(ofs+len)'\000';ofs+len+1)0envinBytes.setresofs'\000';Some(Bytes.unsafe_to_stringres)inList.iterUnix.set_close_on_exectoclose;letstdin_fd=win32_get_fdUnix.stdinstdinandstdout_fd=win32_get_fdUnix.stdoutstdoutandstderr_fd=win32_get_fdUnix.stderrstderrinletproc=win32_create_process(ifprog=""thenNoneelseSomeprog)cmdlineenv(stdin_fd,stdout_fd,stderr_fd)inletclose=function|`FD_movefd->Unix.closefd|_->()inclosestdin;closestdout;closestderr;procexternalwin32_wait_job:Unix.file_descr->intLwt_unix.job="lwt_process_wait_job"letwin32_waitprocproc=Lwt_unix.run_job(win32_wait_jobproc.fd)>>=funcode->Lwt.return(proc.id,Lwt_unix.WEXITEDcode,{Lwt_unix.ru_utime=0.;Lwt_unix.ru_stime=0.})externalwin32_terminate_process:Unix.file_descr->int->unit="lwt_process_terminate_process"letwin32_terminateproc=win32_terminate_processproc.fd1letunix_redirectfdredirection=matchredirectionwith|`Keep->()|`Dev_null->Unix.closefd;letdev_null=Unix.openfile"/dev/null"[Unix.O_RDWR]0o666iniffd<>dev_nullthenbeginUnix.dup2dev_nullfd;Unix.closedev_nullend|`Close->Unix.closefd|`FD_copyfd'->Unix.dup2fd'fd|`FD_movefd'->Unix.dup2fd'fd;Unix.closefd'externalsys_exit:int->'a="caml_sys_exit"letunix_spawn(prog,args)env?(stdin:redirection=`Keep)?(stdout:redirection=`Keep)?(stderr:redirection=`Keep)toclose=letprog=ifprog=""&&Array.lengthargs>0thenargs.(0)elseproginmatchLwt_unix.fork()with|0->unix_redirectUnix.stdinstdin;unix_redirectUnix.stdoutstdout;unix_redirectUnix.stderrstderr;List.iterUnix.closetoclose;begintrymatchenvwith|None->Unix.execvpprogargs|Someenv->Unix.execvpeprogargsenvwith_->(* Do not run at_exit hooks *)sys_exit127end|id->letclose=function|`FD_movefd->Unix.closefd|_->()inclosestdin;closestdout;closestderr;{id;fd=Unix.stdin}letunix_waitprocproc=Lwt_unix.wait4[]proc.idletunix_terminateproc=Unix.killproc.idSys.sigkillletspawn=ifSys.win32thenwin32_spawnelseunix_spawnletwaitproc=ifSys.win32thenwin32_waitprocelseunix_waitprocletterminate=ifSys.win32thenwin32_terminateelseunix_terminate(* +-----------------------------------------------------------------+
| Objects |
+-----------------------------------------------------------------+ *)typestate=|Running|ExitedofUnix.process_statusletstatus(_pid,status,_rusage)=statusletrusage(_pid,_status,rusage)=rusageexternalcast_chan:'aLwt_io.channel->unitLwt_io.channel="%identity"(* Transform a channel into a channel that only support closing. *)letignore_closechan=ignore(Lwt_io.closechan)classvirtualcommontimeoutprocchannels=letwait=waitprocprocinobject(self)valmutableclosed=falsemethodpid=proc.idmethodstate=matchLwt.pollwaitwith|None->Running|Some(_pid,status,_rusage)->Exitedstatusmethodkillsignum=ifLwt.statewait=Lwt.SleepthenUnix.killproc.idsignummethodterminate=ifLwt.statewait=Lwt.Sleepthenterminateprocmethodclose=ifclosedthenself#statuselse(closed<-true;Lwt.protected(Lwt.join(List.mapLwt_io.closechannels))>>=fun()->self#status)methodstatus=Lwt.protectedwait>|=statusmethodrusage=Lwt.protectedwait>|=rusageinitializer(* Ensure channels are closed when no longer used. *)List.iter(Gc.finaliseignore_close)channels;(* Handle timeout. *)matchtimeoutwith|None->()|Somedt->ignore((* Ignore errors since they can be obtained by
self#close. *)Lwt.try_bind(fun()->Lwt.choose[(Lwt_unix.sleepdt>>=fun()->Lwt.return_false);(wait>>=fun_->Lwt.return_true)])(function|true->Lwt.return_unit|false->self#terminate;self#close>>=fun_->Lwt.return_unit)(fun_->(* The exception is dropped because it can be
obtained with self#close. *)Lwt.return_unit))endclassprocess_none?timeout?env?stdin?stdout?stderrcmd=letproc=spawncmdenv?stdin?stdout?stderr[]inobjectinheritcommontimeoutproc[]endclassprocess_in?timeout?env?stdin?stderrcmd=letstdout_r,stdout_w=Unix.pipe()inletproc=spawncmdenv?stdin~stdout:(`FD_movestdout_w)?stderr[stdout_r]inletstdout=Lwt_io.of_unix_fd~mode:Lwt_io.inputstdout_rinobjectinheritcommontimeoutproc[cast_chanstdout]methodstdout=stdoutendclassprocess_out?timeout?env?stdout?stderrcmd=letstdin_r,stdin_w=Unix.pipe()inletproc=spawncmdenv~stdin:(`FD_movestdin_r)?stdout?stderr[stdin_w]inletstdin=Lwt_io.of_unix_fd~mode:Lwt_io.outputstdin_winobjectinheritcommontimeoutproc[cast_chanstdin]methodstdin=stdinendclassprocess?timeout?env?stderrcmd=letstdin_r,stdin_w=Unix.pipe()andstdout_r,stdout_w=Unix.pipe()inletproc=spawncmdenv~stdin:(`FD_movestdin_r)~stdout:(`FD_movestdout_w)?stderr[stdin_w;stdout_r]inletstdin=Lwt_io.of_unix_fd~mode:Lwt_io.outputstdin_wandstdout=Lwt_io.of_unix_fd~mode:Lwt_io.inputstdout_rinobjectinheritcommontimeoutproc[cast_chanstdin;cast_chanstdout]methodstdin=stdinmethodstdout=stdoutendclassprocess_full?timeout?envcmd=letstdin_r,stdin_w=Unix.pipe()andstdout_r,stdout_w=Unix.pipe()andstderr_r,stderr_w=Unix.pipe()inletproc=spawncmdenv~stdin:(`FD_movestdin_r)~stdout:(`FD_movestdout_w)~stderr:(`FD_movestderr_w)[stdin_w;stdout_r;stderr_r]inletstdin=Lwt_io.of_unix_fd~mode:Lwt_io.outputstdin_wandstdout=Lwt_io.of_unix_fd~mode:Lwt_io.inputstdout_randstderr=Lwt_io.of_unix_fd~mode:Lwt_io.inputstderr_rinobjectinheritcommontimeoutproc[cast_chanstdin;cast_chanstdout;cast_chanstderr]methodstdin=stdinmethodstdout=stdoutmethodstderr=stderrendletopen_process_none?timeout?env?stdin?stdout?stderrcmd=newprocess_none?timeout?env?stdin?stdout?stderrcmdletopen_process_in?timeout?env?stdin?stderrcmd=newprocess_in?timeout?env?stdin?stderrcmdletopen_process_out?timeout?env?stdout?stderrcmd=newprocess_out?timeout?env?stdout?stderrcmdletopen_process?timeout?env?stderrcmd=newprocess?timeout?env?stderrcmdletopen_process_full?timeout?envcmd=newprocess_full?timeout?envcmdletmake_withbackend?timeout?envcmdf=letprocess=backend?timeout?envcmdinLwt.finalize(fun()->fprocess)(fun()->process#close>>=fun_->Lwt.return_unit)letwith_process_none?timeout?env?stdin?stdout?stderrcmdf=make_with(open_process_none?stdin?stdout?stderr)?timeout?envcmdfletwith_process_in?timeout?env?stdin?stderrcmdf=make_with(open_process_in?stdin?stderr)?timeout?envcmdfletwith_process_out?timeout?env?stdout?stderrcmdf=make_with(open_process_out?stdout?stderr)?timeout?envcmdfletwith_process?timeout?env?stderrcmdf=make_with(open_process?stderr)?timeout?envcmdfletwith_process_full?timeout?envcmdf=make_withopen_process_full?timeout?envcmdf(* +-----------------------------------------------------------------+
| High-level functions |
+-----------------------------------------------------------------+ *)letexec?timeout?env?stdin?stdout?stderrcmd=(open_process_none?timeout?env?stdin?stdout?stderrcmd)#closeletignore_closech=ignore(Lwt_io.closech)letread_optreadic=Lwt.catch(fun()->readic>|=funx->Somex)(function|Unix.Unix_error(Unix.EPIPE,_,_)|End_of_file->Lwt.return_none|exn->Lwt.failexn)[@ocaml.warning"-4"]letrecv_charspr=letic=pr#stdoutinGc.finaliseignore_closeic;Lwt_stream.from(fun_->read_optLwt_io.read_charic>>=funx->ifx=NonethenbeginLwt_io.closeic>>=fun()->Lwt.returnxendelseLwt.returnx)letrecv_linespr=letic=pr#stdoutinGc.finaliseignore_closeic;Lwt_stream.from(fun_->read_optLwt_io.read_lineic>>=funx->ifx=NonethenbeginLwt_io.closeic>>=fun()->Lwt.returnxendelseLwt.returnx)letrecvpr=letic=pr#stdoutinLwt.finalize(fun()->Lwt_io.readic)(fun()->Lwt_io.closeic)letrecv_linepr=letic=pr#stdoutinLwt.finalize(fun()->Lwt_io.read_lineic)(fun()->Lwt_io.closeic)letsendfprdata=letoc=pr#stdininLwt.finalize(fun()->focdata)(fun()->Lwt_io.closeoc)(* Receiving *)letpread?timeout?env?stdin?stderrcmd=recv(open_process_in?timeout?env?stdin?stderrcmd)letpread_chars?timeout?env?stdin?stderrcmd=recv_chars(open_process_in?timeout?env?stdin?stderrcmd)letpread_line?timeout?env?stdin?stderrcmd=recv_line(open_process_in?timeout?env?stdin?stderrcmd)letpread_lines?timeout?env?stdin?stderrcmd=recv_lines(open_process_in?timeout?env?stdin?stderrcmd)(* Sending *)letpwrite?timeout?env?stdout?stderrcmdtext=sendLwt_io.write(open_process_out?timeout?env?stdout?stderrcmd)textletpwrite_chars?timeout?env?stdout?stderrcmdchars=sendLwt_io.write_chars(open_process_out?timeout?env?stdout?stderrcmd)charsletpwrite_line?timeout?env?stdout?stderrcmdline=sendLwt_io.write_line(open_process_out?timeout?env?stdout?stderrcmd)lineletpwrite_lines?timeout?env?stdout?stderrcmdlines=sendLwt_io.write_lines(open_process_out?timeout?env?stdout?stderrcmd)lines(* Mapping *)type'amap_state=|Init|Saveof'aoptionLwt.t|Done(* Monitor the thread [sender] in the stream [st] so write errors are
reported. *)letmonitorsenderst=letsender=sender>|=fun()->Noneinletstate=refInitinLwt_stream.from(fun()->match!statewith|Init->letgetter=Lwt.applyLwt_stream.getstinletresult_=matchLwt.statesenderwith|Lwt.Sleep->(* The sender is still sleeping, behave as the
getter. *)getter|Lwt.Return_->(* The sender terminated successfully, we are
done monitoring it. *)state:=Done;getter|Lwt.Fail_->(* The sender failed, behave as the sender for
this element and save current getter. *)state:=Savegetter;senderinLwt.try_bind(fun()->Lwt.choose[sender;getter])resultresult|Savet->state:=Done;t|Done->Lwt_stream.getst)letpmap?timeout?env?stderrcmdtext=letpr=open_process?timeout?env?stderrcmdin(* Start the sender and getter at the same time. *)letsender=sendLwt_io.writeprtextinletgetter=recvprinLwt.catch(fun()->(* Wait for both to terminate, returning the result of the
getter. *)sender>>=fun()->getter)(function|Lwt.Canceledasexn->(* Cancel the getter if the sender was canceled. *)Lwt.cancelgetter;Lwt.failexn|exn->Lwt.failexn)letpmap_chars?timeout?env?stderrcmdchars=letpr=open_process?timeout?env?stderrcmdinletsender=sendLwt_io.write_charsprcharsinmonitorsender(recv_charspr)letpmap_line?timeout?env?stderrcmdline=letpr=open_process?timeout?env?stderrcmdin(* Start the sender and getter at the same time. *)letsender=sendLwt_io.write_lineprlineinletgetter=recv_lineprinLwt.catch(fun()->(* Wait for both to terminate, returning the result of the
getter. *)sender>>=fun()->getter)(function|Lwt.Canceledasexn->(* Cancel the getter if the sender was canceled. *)Lwt.cancelgetter;Lwt.failexn|exn->Lwt.failexn)letpmap_lines?timeout?env?stderrcmdlines=letpr=open_process?timeout?env?stderrcmdinletsender=sendLwt_io.write_linesprlinesinmonitorsender(recv_linespr)