Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file lwt_throttle.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118(* This file is part of Lwt, released under the MIT license. See LICENSE.md for
details, or visit https://github.com/ocsigen/lwt/blob/master/LICENSE.md. *)openLwt.InfixmoduletypeS=sigtypekeytypetvalcreate:rate:int->max:int->n:int->tvalwait:t->key->boolLwt.tendmoduleMake(H:Hashtbl.HashedType):(Swithtypekey=H.t)=structmoduleMH=Hashtbl.Make(H)typekey=H.ttypeelt={mutableconsumed:int;queue:boolLwt.uQueue.t;}typet={rate:int;max:int;(* maximum number of waiting threads *)mutablewaiting:int;table:eltMH.t;mutablecleaning:unitLwt.toption;}letcreate~rate~max~n=ifrate<1||max<1||n<0theninvalid_arg"Lwt_throttle.S.create"else{rate=rate;max=max;waiting=0;table=MH.createn;cleaning=None;}letupdate_keytkeyelt(old_waiting,to_run)=letrecupdateto_run=function|0->0,Queue.lengthelt.queue,to_run|i->tryletto_run=(Queue.takeelt.queue)::to_runinupdateto_run(i-1)with|Queue.Empty->i,0,to_runinletnot_consumed,waiting,to_run=updateto_runt.rateinletconsumed=t.rate-not_consumedinifconsumed=0then(* there is no waiting threads for this key: we can clean the table *)MH.removet.tablekeyelseelt.consumed<-consumed;(old_waiting+waiting,to_run)letrecclean_tablet=letwaiting,to_run=MH.fold(update_keyt)t.table(0,[])int.waiting<-waiting;ifwaiting=0&&to_run=[]then(* the table is empty: we do not need to clean in 1 second *)t.cleaning<-Noneelselaunch_cleaningt;List.iter(funu->Lwt.wakeuputrue)to_runandlaunch_cleaningt=t.cleaning<-lett=Lwt_unix.sleep1.>>=fun()->Lwt.catch(fun()->clean_tablet;Lwt.return_unit)(fun_exn->(* Not good practice, but not worse than the code it is
replacing. *)prerr_endline"internal error";Printexc.print_backtracestderr;Lwt.return())inSometletreally_waittelt=letw,u=Lwt.task()inift.max>t.waitingthen(Queue.adduelt.queue;t.waiting<-succt.waiting;w)elseLwt.return_falseletwaittkey=letres=tryletelt=MH.findt.tablekeyinifelt.consumed>=t.ratethenreally_waitteltelse(elt.consumed<-succelt.consumed;Lwt.return_true)with|Not_found->letelt={consumed=1;queue=Queue.create()}inMH.addt.tablekeyelt;Lwt.return_truein(matcht.cleaningwith|None->launch_cleaningt|Some_->());resend