package eio
Effect-based direct-style IO API for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
eio-0.7.tbz
sha256=675e67f343ccf37b965d15d1ee1c639d7a06431e8f08e95559133419f3488ee1
sha512=3d1bd0e5e0aa79d8858d83944d734a0efc325ed66a12a1506c3b36281db56c0216e6cb90a46e6021db1ea34cdd2567ebabe0bd687d9989495bb7cf6099e90ba7
doc/src/eio.core/fiber.ml.html
Source file fiber.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t let yield () = let fiber = Suspend.enter (fun fiber enqueue -> enqueue (Ok fiber)) in Cancel.check fiber.cancel_context (* Note: [f] must not raise an exception, as that would terminate the whole scheduler. *) let fork_raw new_fiber f = Effect.perform (Fork (new_fiber, f)) let fork ~sw f = Switch.check_our_domain sw; if Cancel.is_on sw.cancel then ( let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in fork_raw new_fiber @@ fun () -> Switch.with_op sw @@ fun () -> match f () with | () -> Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception ex -> Switch.fail sw ex; (* The [with_op] ensures this will succeed *) Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_daemon ~sw f = Switch.check_our_domain sw; if Cancel.is_on sw.cancel then ( let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in fork_raw new_fiber @@ fun () -> Switch.with_daemon sw @@ fun () -> match f () with | `Stop_daemon -> (* The daemon asked to stop. *) Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) -> (* The daemon was cancelled because all non-daemon fibers are finished. *) Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None | exception ex -> Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *) Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_promise ~sw f = Switch.check_our_domain sw; let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match Switch.with_op sw f with | x -> Promise.resolve_ok r x | exception ex -> Promise.resolve_error r ex (* Can't fail; only we have [r] *) ); p (* This is not exposed. On failure it fails [sw], but you need to make sure that any fibers waiting on the promise will be cancelled. *) let fork_promise_exn ~sw f = Switch.check_our_domain sw; let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match Switch.with_op sw f with | x -> Promise.resolve r x | exception ex -> Switch.fail sw ex (* The [with_op] ensures this will succeed *) ); p let all xs = Switch.run @@ fun sw -> List.iter (fork ~sw) xs let both f g = all [f; g] let pair f g = Switch.run @@ fun sw -> let x = fork_promise ~sw f in let y = g () in (Promise.await_exn x, y) let fork_sub ~sw ~on_error f = fork ~sw (fun () -> try Switch.run f with | ex when Cancel.is_on sw.cancel -> (* Typically the caller's context is within [sw], but it doesn't have to be. It's possible that the original context has finished by now, but [fork] is keeping [sw] alive so we can use that report the error. *) Switch.run_in sw @@ fun () -> try on_error ex with ex2 -> (* The [run_in] ensures [adopting_sw] isn't finished here *) Switch.fail sw ex; Switch.fail sw ex2 ) exception Not_first let await_cancel () = Suspend.enter @@ fun fiber enqueue -> Cancel.Fiber_context.set_cancel_fn fiber (fun ex -> enqueue (Error ex)) let any fs = let r = ref `None in let parent_c = Cancel.sub_unchecked (fun cc -> let wrap h = match h () with | x -> begin match !r with | `None -> r := `Ok x; Cancel.cancel cc Not_first | `Ex _ | `Ok _ -> () end | exception Cancel.Cancelled _ when not (Cancel.is_on cc) -> (* If this is in response to us asking the fiber to cancel then we can just ignore it. If it's in response to our parent context being cancelled (which also cancels [cc]) then we'll check that context and raise it at the end anyway. *) () | exception ex -> begin match !r with | `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex | `Ok _ -> r := `Ex (ex, Printexc.get_raw_backtrace ()) | `Ex prev -> let bt = Printexc.get_raw_backtrace () in r := `Ex (Exn.combine prev (ex, bt)) end in let vars = Cancel.Fiber_context.get_vars () in let rec aux = function | [] -> await_cancel () | [f] -> wrap f; [] | f :: fs -> let new_fiber = Cancel.Fiber_context.make ~cc ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match wrap f with | x -> Promise.resolve_ok r x | exception ex -> Promise.resolve_error r ex ); p :: aux fs in let ps = aux fs in Cancel.protect (fun () -> List.iter Promise.await_exn ps) ) in match !r, Cancel.get_error parent_c with | `Ok r, None -> r | (`Ok _ | `None), Some ex -> raise ex | `Ex (ex, bt), None -> Printexc.raise_with_backtrace ex bt | `Ex ex1, Some ex2 -> let bt2 = Printexc.get_raw_backtrace () in let ex, bt = Exn.combine ex1 (ex2, bt2) in Printexc.raise_with_backtrace ex bt | `None, None -> assert false let first f g = any [f; g] let check () = let ctx = Effect.perform Cancel.Get_context in Cancel.check ctx.cancel_context (* Some concurrent list operations *) module List = struct let opt_cons x xs = match x with | None -> xs | Some x -> x :: xs module Limiter : sig (** This is a bit like using a semaphore, but it assumes that there is only a single fiber using it. e.g. you must not call {!use}, {!fork}, etc from two different fibers. *) type t val create : sw:Switch.t -> int -> t (** [create ~sw n] is a limiter that allows running up to [n] jobs at once. *) val use : t -> ('a -> 'b) -> 'a -> 'b (** [use t fn x] runs [fn x] in this fiber, counting it as one use of [t]. *) val fork : t -> ('a -> unit) -> 'a -> unit (** [fork t fn x] runs [fn x] in a new fibre, once a fiber is free. *) val fork_promise_exn : t -> ('a -> 'b) -> 'a -> 'b Promise.t (** [fork_promise_exn t fn x] runs [fn x] in a new fibre, once a fiber is free, and returns a promise for the result. *) end = struct type t = { mutable free_fibers : int; cond : unit Single_waiter.t; sw : Switch.t; } let max_fibers_err n = Fmt.failwith "max_fibers must be positive (got %d)" n let create ~sw max_fibers = if max_fibers <= 0 then max_fibers_err max_fibers; { free_fibers = max_fibers; cond = Single_waiter.create (); sw; } let await_free t = if t.free_fibers = 0 then Single_waiter.await t.cond t.sw.id; (* If we got woken up then there was a free fiber then. And since we're the only fiber that uses [t], and we were sleeping, it must still be free. *) assert (t.free_fibers > 0); t.free_fibers <- t.free_fibers - 1 let release t = t.free_fibers <- t.free_fibers + 1; if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ()) let use t fn x = await_free t; let r = fn x in release t; r let fork_promise_exn t fn x = await_free t; fork_promise_exn ~sw:t.sw (fun () -> let r = fn x in release t; r) let fork t fn x = await_free t; fork ~sw:t.sw (fun () -> fn x; release t) end let filter_map ?(max_fibers=max_int) fn items = match items with | [] -> [] (* Avoid creating a switch in the simple case *) | items -> Switch.run @@ fun sw -> let limiter = Limiter.create ~sw max_fibers in let rec aux = function | [] -> [] | [x] -> Option.to_list (Limiter.use limiter fn x) | x :: xs -> let x = Limiter.fork_promise_exn limiter fn x in let xs = aux xs in opt_cons (Promise.await x) xs in aux items let map ?max_fibers fn = filter_map ?max_fibers (fun x -> Some (fn x)) let filter ?max_fibers fn = filter_map ?max_fibers (fun x -> if fn x then Some x else None) let iter ?(max_fibers=max_int) fn items = match items with | [] -> () (* Avoid creating a switch in the simple case *) | items -> Switch.run @@ fun sw -> let limiter = Limiter.create ~sw max_fibers in let rec aux = function | [] -> () | [x] -> Limiter.use limiter fn x | x :: xs -> Limiter.fork limiter fn x; aux xs in aux items end include List type 'a key = 'a Hmap.key let create_key () = Hmap.Key.create () let get key = Hmap.find key (Cancel.Fiber_context.get_vars ()) let with_binding var value fn = let ctx = Effect.perform Cancel.Get_context in Cancel.Fiber_context.with_vars ctx (Hmap.add var value ctx.vars) fn let without_binding var fn = let ctx = Effect.perform Cancel.Get_context in Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>