package irmin
Irmin, a distributed database that follows the same design principles as Git
Install
Dune Dependency
Authors
Maintainers
Sources
irmin-2.6.0.tbz
sha256=1db134221e82c424260a0e206b640fcb82902be35eea4137af2bcd9c98d3ac0f
sha512=b334e5b909563787e58790e4665f78a9f21e0f9f976eb7344cb76cbe7db870506bab193cec206e338ba74457896b2176000c936397cf3d44326507300a8193d6
doc/src/irmin/watch.ml.html
Source file watch.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
(* * Copyright (c) 2017 Thomas Gazagnaire <thomas@gazagnaire.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above * copyright notice and this permission notice appear in all copies. * * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) open! Import include Watch_intf let src = Logs.Src.create "irmin.watch" ~doc:"Irmin watch notifications" module Log = (val Logs.src_log src : Logs.LOG) let none _ _ = Printf.eprintf "Listen hook not set!\n%!"; assert false let listen_dir_hook = ref none type hook = int -> string -> (string -> unit Lwt.t) -> (unit -> unit Lwt.t) Lwt.t let set_listen_dir_hook (h : hook) = listen_dir_hook := h let id () = let c = ref 0 in fun () -> incr c; !c let global = id () let workers_r = ref 0 let workers () = !workers_r let scheduler () = let p = ref None in let niet () = () in let c = ref niet in let push elt = match !p with | Some p -> p elt | None -> let stream, push = Lwt_stream.create () in incr workers_r; Lwt.async (fun () -> (* FIXME: we would like to skip some updates if more recent ones are at the back of the queue. *) Lwt_stream.iter_s (fun f -> f ()) stream); p := Some push; (c := fun () -> push None); push elt in let clean () = !c (); decr workers_r; c := niet; p := None in let enqueue v = push (Some v) in (clean, enqueue) module Make (K : sig type t val t : t Type.t end) (V : sig type t val t : t Type.t end) = struct type key = K.t type value = V.t type watch = int module KMap = Map.Make (struct type t = K.t let compare = Type.(unstage (compare K.t)) end) module IMap = Map.Make (struct type t = int let compare (x : int) (y : int) = compare x y end) type key_handler = value Diff.t -> unit Lwt.t type all_handler = key -> value Diff.t -> unit Lwt.t let pp_value = Type.pp V.t let equal_opt_values = Type.(unstage (equal (option V.t))) let equal_keys = Type.(unstage (equal K.t)) type t = { id : int; (* unique watch manager id. *) lock : Lwt_mutex.t; (* protect [keys] and [glob]. *) mutable next : int; (* next id, to identify watch handlers. *) mutable keys : (key * value option * key_handler) IMap.t; (* key handlers. *) mutable glob : (value KMap.t * all_handler) IMap.t; (* global handlers. *) enqueue : (unit -> unit Lwt.t) -> unit; (* enqueue notifications. *) clean : unit -> unit; (* destroy the notification thread. *) mutable listeners : int; (* number of listeners. *) mutable stop_listening : unit -> unit Lwt.t; (* clean-up listen resources. *) mutable notifications : int; (* number of notifcations. *) } let stats t = (IMap.cardinal t.keys, IMap.cardinal t.glob) let to_string t = let k, a = stats t in Printf.sprintf "[%d: %dk/%dg|%d]" t.id k a t.listeners let next t = let id = t.next in t.next <- id + 1; id let is_empty t = IMap.is_empty t.keys && IMap.is_empty t.glob let clear_unsafe t = t.keys <- IMap.empty; t.glob <- IMap.empty; t.next <- 0 let clear t = Lwt_mutex.with_lock t.lock (fun () -> clear_unsafe t; Lwt.return_unit) let v () = let lock = Lwt_mutex.create () in let clean, enqueue = scheduler () in { lock; clean; enqueue; id = global (); next = 0; keys = IMap.empty; glob = IMap.empty; listeners = 0; stop_listening = (fun () -> Lwt.return_unit); notifications = 0; } let unwatch_unsafe t id = Log.debug (fun f -> f "unwatch %s: id=%d" (to_string t) id); let glob = IMap.remove id t.glob in let keys = IMap.remove id t.keys in t.glob <- glob; t.keys <- keys let unwatch t id = Lwt_mutex.with_lock t.lock (fun () -> unwatch_unsafe t id; if is_empty t then t.clean (); Lwt.return_unit) let mk old value = match (old, value) with | None, None -> assert false | Some v, None -> `Removed v | None, Some v -> `Added v | Some x, Some y -> `Updated (x, y) let protect f () = Lwt.catch f (fun e -> Log.err (fun l -> l "watch callback got: %a\n%s" Fmt.exn e (Printexc.get_backtrace ())); Lwt.return_unit) let pp_option = Fmt.option ~none:(Fmt.any "<none>") let pp_key = Type.pp K.t let notify_all_unsafe t key value = let todo = ref [] in let glob = IMap.fold (fun id ((init, f) as arg) acc -> let fire old_value = todo := protect (fun () -> Log.debug (fun f -> f "notify-all[%d.%d:%a]: %d firing! (%a -> %a)" t.id id pp_key key t.notifications (pp_option pp_value) old_value (pp_option pp_value) value); t.notifications <- t.notifications + 1; f key (mk old_value value)) :: !todo; let init = match value with | None -> KMap.remove key init | Some v -> KMap.add key v init in IMap.add id (init, f) acc in let old_value = try Some (KMap.find key init) with Not_found -> None in if equal_opt_values old_value value then ( Log.debug (fun f -> f "notify-all[%d:%d:%a]: same value, skipping." t.id id pp_key key); IMap.add id arg acc) else fire old_value) t.glob IMap.empty in t.glob <- glob; match !todo with | [] -> () | ts -> t.enqueue (fun () -> Lwt_list.iter_p (fun x -> x ()) ts) let notify_key_unsafe t key value = let todo = ref [] in let keys = IMap.fold (fun id ((k, old_value, f) as arg) acc -> if not (equal_keys key k) then IMap.add id arg acc else if equal_opt_values value old_value then ( Log.debug (fun f -> f "notify-key[%d.%d:%a]: same value, skipping." t.id id pp_key key); IMap.add id arg acc) else ( todo := protect (fun () -> Log.debug (fun f -> f "notify-key[%d:%d:%a] %d firing! (%a -> %a)" t.id id pp_key key t.notifications (pp_option pp_value) old_value (pp_option pp_value) value); t.notifications <- t.notifications + 1; f (mk old_value value)) :: !todo; IMap.add id (k, value, f) acc)) t.keys IMap.empty in t.keys <- keys; match !todo with | [] -> () | ts -> t.enqueue (fun () -> Lwt_list.iter_p (fun x -> x ()) ts) let notify t key value = Lwt_mutex.with_lock t.lock (fun () -> if is_empty t then Lwt.return_unit else ( notify_all_unsafe t key value; notify_key_unsafe t key value; Lwt.return_unit)) let watch_key_unsafe t key ?init f = let id = next t in Log.debug (fun f -> f "watch-key %s: id=%d" (to_string t) id); t.keys <- IMap.add id (key, init, f) t.keys; id let watch_key t key ?init f = Lwt_mutex.with_lock t.lock (fun () -> let id = watch_key_unsafe t ?init key f in Lwt.return id) let kmap_of_alist l = List.fold_left (fun map (k, v) -> KMap.add k v map) KMap.empty l let watch_unsafe t ?(init = []) f = let id = next t in Log.debug (fun f -> f "watch %s: id=%d" (to_string t) id); t.glob <- IMap.add id (kmap_of_alist init, f) t.glob; id let watch t ?init f = Lwt_mutex.with_lock t.lock (fun () -> let id = watch_unsafe t ?init f in Lwt.return id) let listen_dir t dir ~key ~value = let init () = if t.listeners = 0 then ( Log.debug (fun f -> f "%s: start listening to %s" (to_string t) dir); let+ f = !listen_dir_hook t.id dir (fun file -> match key file with | None -> Lwt.return_unit | Some key -> let rec read n = let* value = value key in let n' = t.notifications in if n = n' then notify t key value else ( Log.debug (fun l -> l "Stale event, trying reading again"); read n') in read t.notifications) in t.stop_listening <- f) else ( Log.debug (fun f -> f "%s: already listening on %s" (to_string t) dir); Lwt.return_unit) in init () >|= fun () -> t.listeners <- t.listeners + 1; function | () -> if t.listeners > 0 then t.listeners <- t.listeners - 1; if t.listeners <> 0 then Lwt.return_unit else ( Log.debug (fun f -> f "%s: stop listening to %s" (to_string t) dir); t.stop_listening ()) end
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>