Legend:
Page
Library
Module
Module type
Parameter
Class
Class type
Source
Source file watch.ml
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328(*
* Copyright (c) 2017 Thomas Gazagnaire <thomas@gazagnaire.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)open!ImportincludeWatch_intfletsrc=Logs.Src.create"irmin.watch"~doc:"Irmin watch notifications"moduleLog=(valLogs.src_logsrc:Logs.LOG)letnone__=Printf.eprintf"Listen hook not set!\n%!";assertfalseletlisten_dir_hook=refnonetypehook=int->string->(string->unitLwt.t)->(unit->unitLwt.t)Lwt.tletset_listen_dir_hook(h:hook)=listen_dir_hook:=hletid()=letc=ref0infun()->incrc;!cletglobal=id()letworkers_r=ref0letworkers()=!workers_rletscheduler()=letp=refNoneinletniet()=()inletc=refnietinletpushelt=match!pwith|Somep->pelt|None->letstream,push=Lwt_stream.create()inincrworkers_r;Lwt.async(fun()->(* FIXME: we would like to skip some updates if more recent ones
are at the back of the queue. *)Lwt_stream.iter_s(funf->f())stream);p:=Somepush;(c:=fun()->pushNone);pusheltinletclean()=!c();decrworkers_r;c:=niet;p:=Noneinletenqueuev=push(Somev)in(clean,enqueue)moduleMake(K:sigtypetvalt:tType.tend)(V:sigtypetvalt:tType.tend)=structtypekey=K.ttypevalue=V.ttypewatch=intmoduleKMap=Map.Make(structtypet=K.tletcompare=Type.(unstage(compareK.t))end)moduleIMap=Map.Make(structtypet=intletcompare(x:int)(y:int)=comparexyend)typekey_handler=valueDiff.t->unitLwt.ttypeall_handler=key->valueDiff.t->unitLwt.tletpp_value=Type.ppV.tletequal_opt_values=Type.(unstage(equal(optionV.t)))letequal_keys=Type.(unstage(equalK.t))typet={id:int;(* unique watch manager id. *)lock:Lwt_mutex.t;(* protect [keys] and [glob]. *)mutablenext:int;(* next id, to identify watch handlers. *)mutablekeys:(key*valueoption*key_handler)IMap.t;(* key handlers. *)mutableglob:(valueKMap.t*all_handler)IMap.t;(* global handlers. *)enqueue:(unit->unitLwt.t)->unit;(* enqueue notifications. *)clean:unit->unit;(* destroy the notification thread. *)mutablelisteners:int;(* number of listeners. *)mutablestop_listening:unit->unitLwt.t;(* clean-up listen resources. *)mutablenotifications:int;(* number of notifcations. *)}letstatst=(IMap.cardinalt.keys,IMap.cardinalt.glob)letto_stringt=letk,a=statstinPrintf.sprintf"[%d: %dk/%dg|%d]"t.idkat.listenersletnextt=letid=t.nextint.next<-id+1;idletis_emptyt=IMap.is_emptyt.keys&&IMap.is_emptyt.globletclear_unsafet=t.keys<-IMap.empty;t.glob<-IMap.empty;t.next<-0letcleart=Lwt_mutex.with_lockt.lock(fun()->clear_unsafet;Lwt.return_unit)letv()=letlock=Lwt_mutex.create()inletclean,enqueue=scheduler()in{lock;clean;enqueue;id=global();next=0;keys=IMap.empty;glob=IMap.empty;listeners=0;stop_listening=(fun()->Lwt.return_unit);notifications=0;}letunwatch_unsafetid=Log.debug(funf->f"unwatch %s: id=%d"(to_stringt)id);letglob=IMap.removeidt.globinletkeys=IMap.removeidt.keysint.glob<-glob;t.keys<-keysletunwatchtid=Lwt_mutex.with_lockt.lock(fun()->unwatch_unsafetid;ifis_emptytthent.clean();Lwt.return_unit)letmkoldvalue=match(old,value)with|None,None->assertfalse|Somev,None->`Removedv|None,Somev->`Addedv|Somex,Somey->`Updated(x,y)letprotectf()=Lwt.catchf(fune->Log.err(funl->l"watch callback got: %a\n%s"Fmt.exne(Printexc.get_backtrace()));Lwt.return_unit)letpp_option=Fmt.option~none:(Fmt.any"<none>")letpp_key=Type.ppK.tletnotify_all_unsafetkeyvalue=lettodo=ref[]inletglob=IMap.fold(funid((init,f)asarg)acc->letfireold_value=todo:=protect(fun()->Log.debug(funf->f"notify-all[%d.%d:%a]: %d firing! (%a -> %a)"t.ididpp_keykeyt.notifications(pp_optionpp_value)old_value(pp_optionpp_value)value);t.notifications<-t.notifications+1;fkey(mkold_valuevalue))::!todo;letinit=matchvaluewith|None->KMap.removekeyinit|Somev->KMap.addkeyvinitinIMap.addid(init,f)accinletold_value=trySome(KMap.findkeyinit)withNot_found->Noneinifequal_opt_valuesold_valuevaluethen(Log.debug(funf->f"notify-all[%d:%d:%a]: same value, skipping."t.ididpp_keykey);IMap.addidargacc)elsefireold_value)t.globIMap.emptyint.glob<-glob;match!todowith|[]->()|ts->t.enqueue(fun()->Lwt_list.iter_p(funx->x())ts)letnotify_key_unsafetkeyvalue=lettodo=ref[]inletkeys=IMap.fold(funid((k,old_value,f)asarg)acc->ifnot(equal_keyskeyk)thenIMap.addidargaccelseifequal_opt_valuesvalueold_valuethen(Log.debug(funf->f"notify-key[%d.%d:%a]: same value, skipping."t.ididpp_keykey);IMap.addidargacc)else(todo:=protect(fun()->Log.debug(funf->f"notify-key[%d:%d:%a] %d firing! (%a -> %a)"t.ididpp_keykeyt.notifications(pp_optionpp_value)old_value(pp_optionpp_value)value);t.notifications<-t.notifications+1;f(mkold_valuevalue))::!todo;IMap.addid(k,value,f)acc))t.keysIMap.emptyint.keys<-keys;match!todowith|[]->()|ts->t.enqueue(fun()->Lwt_list.iter_p(funx->x())ts)letnotifytkeyvalue=Lwt_mutex.with_lockt.lock(fun()->ifis_emptytthenLwt.return_unitelse(notify_all_unsafetkeyvalue;notify_key_unsafetkeyvalue;Lwt.return_unit))letwatch_key_unsafetkey?initf=letid=nexttinLog.debug(funf->f"watch-key %s: id=%d"(to_stringt)id);t.keys<-IMap.addid(key,init,f)t.keys;idletwatch_keytkey?initf=Lwt_mutex.with_lockt.lock(fun()->letid=watch_key_unsafet?initkeyfinLwt.returnid)letkmap_of_alistl=List.fold_left(funmap(k,v)->KMap.addkvmap)KMap.emptylletwatch_unsafet?(init=[])f=letid=nexttinLog.debug(funf->f"watch %s: id=%d"(to_stringt)id);t.glob<-IMap.addid(kmap_of_alistinit,f)t.glob;idletwatcht?initf=Lwt_mutex.with_lockt.lock(fun()->letid=watch_unsafet?initfinLwt.returnid)letlisten_dirtdir~key~value=letinit()=ift.listeners=0then(Log.debug(funf->f"%s: start listening to %s"(to_stringt)dir);let+f=!listen_dir_hookt.iddir(funfile->matchkeyfilewith|None->Lwt.return_unit|Somekey->letrecreadn=let*value=valuekeyinletn'=t.notificationsinifn=n'thennotifytkeyvalueelse(Log.debug(funl->l"Stale event, trying reading again");readn')inreadt.notifications)int.stop_listening<-f)else(Log.debug(funf->f"%s: already listening on %s"(to_stringt)dir);Lwt.return_unit)ininit()>|=fun()->t.listeners<-t.listeners+1;function|()->ift.listeners>0thent.listeners<-t.listeners-1;ift.listeners<>0thenLwt.return_unitelse(Log.debug(funf->f"%s: stop listening to %s"(to_stringt)dir);t.stop_listening())end