package eio
Effect-based direct-style IO API for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
eio-1.2.tbz
sha256=3792e912bd8d494bb2e38f73081825e4d212b1970cf2c1f1b2966caa9fd6bc40
sha512=4a80dbcf8cf2663bdad0f2970871844f37bd293c56bd1ce602910e0a613754945f1f942719f9630906453be7c73c1732dc97526c6c90b0b36100d04fd5e871e4
doc/src/eio.core/fiber.ml.html
Source file fiber.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
[@@@alert "-unstable"] type _ Effect.t += Fork : Cancel.fiber_context * (unit -> unit) -> unit Effect.t let yield () = let fiber = Suspend.enter "" (fun fiber enqueue -> enqueue (Ok fiber)) in Cancel.check fiber.cancel_context (* Note: [f] must not raise an exception, as that would terminate the whole scheduler. *) let fork_raw new_fiber f = Effect.perform (Fork (new_fiber, f)) let fork ~sw f = Switch.check_our_domain sw; if Cancel.is_on sw.cancel then ( let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in fork_raw new_fiber @@ fun () -> Switch.with_op sw @@ fun () -> try f () with ex -> let bt = Printexc.get_raw_backtrace () in Switch.fail ~bt sw ex; (* The [with_op] ensures this will succeed *) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_daemon ~sw f = Switch.check_our_domain sw; if Cancel.is_on sw.cancel then ( let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.cancel ~vars in fork_raw new_fiber @@ fun () -> Switch.with_daemon sw @@ fun () -> match f () with | `Stop_daemon -> (* The daemon asked to stop. *) () | exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) -> (* The daemon was cancelled because all non-daemon fibers are finished. *) () | exception ex -> let bt = Printexc.get_raw_backtrace () in Switch.fail ~bt sw ex; (* The [with_daemon] ensures this will succeed *) ) (* else the fiber should report the error to [sw], but [sw] is failed anyway *) let fork_promise ~sw f = Switch.check_our_domain sw; let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match Switch.with_op sw f with | x -> Promise.resolve_ok r x | exception ex -> Promise.resolve_error r ex (* Can't fail; only we have [r] *) ); p (* This is not exposed. On failure it fails [sw], but you need to make sure that any fibers waiting on the promise will be cancelled. *) let fork_promise_exn ~sw f = Switch.check_our_domain sw; let vars = Cancel.Fiber_context.get_vars () in let new_fiber = Cancel.Fiber_context.make ~cc:sw.Switch.cancel ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match Switch.with_op sw f with | x -> Promise.resolve r x | exception ex -> let bt = Printexc.get_raw_backtrace () in Switch.fail ~bt sw ex (* The [with_op] ensures this will succeed *) ); p (* Like [List.iter (fork ~sw)], but runs the last one in the current fiber for efficiency and less cluttered traces. *) let rec forks ~sw = function | [] -> () | [x] -> Switch.check sw; x () | x :: xs -> fork ~sw x; forks ~sw xs let all xs = Switch.run ~name:"all" @@ fun sw -> forks ~sw xs let both f g = Switch.run ~name:"both" @@ fun sw -> forks ~sw [f; g] let pair f g = Switch.run ~name:"pair" @@ fun sw -> let x = fork_promise ~sw f in let y = g () in (Promise.await_exn x, y) exception Not_first let await_cancel () = Suspend.enter "await_cancel" @@ fun fiber enqueue -> Cancel.Fiber_context.set_cancel_fn fiber (fun ex -> enqueue (Error ex)) type 'a any_status = | New | Ex of (exn * Printexc.raw_backtrace) | OK of 'a let any_gen ~return ~combine fs = let r = ref New in let parent_c = Cancel.sub_unchecked Any (fun cc -> let wrap h = match h () with | x -> begin match !r with | New -> r := OK (return x); Cancel.cancel cc Not_first | OK prev -> r := OK (combine prev x) | Ex _ -> () end | exception Cancel.Cancelled _ when not (Cancel.is_on cc) -> (* If this is in response to us asking the fiber to cancel then we can just ignore it. If it's in response to our parent context being cancelled (which also cancels [cc]) then we'll check that context and raise it at the end anyway. *) () | exception ex -> begin match !r with | New -> r := Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex | OK _ -> r := Ex (ex, Printexc.get_raw_backtrace ()) | Ex prev -> let bt = Printexc.get_raw_backtrace () in r := Ex (Exn.combine prev (ex, bt)) end in let vars = Cancel.Fiber_context.get_vars () in let rec aux = function | [] -> await_cancel () | [f] -> wrap f; [] | f :: fs -> let new_fiber = Cancel.Fiber_context.make ~cc ~vars in let p, r = Promise.create_with_id (Cancel.Fiber_context.tid new_fiber) in fork_raw new_fiber (fun () -> match wrap f with | () -> Promise.resolve_ok r () | exception ex -> Promise.resolve_error r ex ); p :: aux fs in let ps = aux fs in Cancel.protect (fun () -> List.iter Promise.await_exn ps) ) in match !r, Cancel.get_error parent_c with | OK r, None -> r | (OK _ | New), Some ex -> raise ex | Ex (ex, bt), None -> Printexc.raise_with_backtrace ex bt | Ex ex1, Some ex2 -> let bt2 = Printexc.get_raw_backtrace () in let ex, bt = Exn.combine ex1 (ex2, bt2) in Printexc.raise_with_backtrace ex bt | New, None -> assert false let n_any fs = List.rev (any_gen fs ~return:(fun x -> [x]) ~combine:(fun xs x -> x :: xs)) let any ?(combine=(fun x _ -> x)) fs = any_gen fs ~return:Fun.id ~combine let first ?combine f g = any ?combine [f; g] let is_cancelled () = let ctx = Effect.perform Cancel.Get_context in not (Cancel.is_on ctx.cancel_context) let check () = let ctx = Effect.perform Cancel.Get_context in Cancel.check ctx.cancel_context (* Some concurrent list operations *) module List = struct let opt_cons x xs = match x with | None -> xs | Some x -> x :: xs module Limiter : sig (** This is a bit like using a semaphore, but it assumes that there is only a single fiber using it. e.g. you must not call {!use}, {!fork}, etc from two different fibers. *) type t val create : sw:Switch.t -> int -> t (** [create ~sw n] is a limiter that allows running up to [n] jobs at once. *) val use : t -> ('a -> 'b) -> 'a -> 'b (** [use t fn x] runs [fn x] in this fiber, counting it as one use of [t]. *) val fork : t -> ('a -> unit) -> 'a -> unit (** [fork t fn x] runs [fn x] in a new fibre, once a fiber is free. *) val fork_promise_exn : t -> ('a -> 'b) -> 'a -> 'b Promise.t (** [fork_promise_exn t fn x] runs [fn x] in a new fibre, once a fiber is free, and returns a promise for the result. *) end = struct type t = { mutable free_fibers : int; cond : unit Single_waiter.t; sw : Switch.t; } let max_fibers_err n = Fmt.failwith "max_fibers must be positive (got %d)" n let create ~sw max_fibers = if max_fibers <= 0 then max_fibers_err max_fibers; { free_fibers = max_fibers; cond = Single_waiter.create (); sw; } let await_free t = if t.free_fibers = 0 then Single_waiter.await t.cond "Limiter.await_free" t.sw.cancel.id; (* If we got woken up then there was a free fiber then. And since we're the only fiber that uses [t], and we were sleeping, it must still be free. *) assert (t.free_fibers > 0); t.free_fibers <- t.free_fibers - 1 let release t = t.free_fibers <- t.free_fibers + 1; if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond let use t fn x = await_free t; let r = fn x in release t; r let fork_promise_exn t fn x = await_free t; fork_promise_exn ~sw:t.sw (fun () -> let r = fn x in release t; r) let fork t fn x = await_free t; fork ~sw:t.sw (fun () -> fn x; release t) end let filter_map ?(max_fibers=max_int) fn items = match items with | [] -> [] (* Avoid creating a switch in the simple case *) | items -> Switch.run ~name:"filter_map" @@ fun sw -> let limiter = Limiter.create ~sw max_fibers in let rec aux = function | [] -> [] | [x] -> Option.to_list (Limiter.use limiter fn x) | x :: xs -> let x = Limiter.fork_promise_exn limiter fn x in let xs = aux xs in opt_cons (Promise.await x) xs in aux items let map ?max_fibers fn = filter_map ?max_fibers (fun x -> Some (fn x)) let filter ?max_fibers fn = filter_map ?max_fibers (fun x -> if fn x then Some x else None) let iter ?(max_fibers=max_int) fn items = match items with | [] -> () (* Avoid creating a switch in the simple case *) | items -> Switch.run ~name:"iter" @@ fun sw -> let limiter = Limiter.create ~sw max_fibers in let rec aux = function | [] -> () | [x] -> Limiter.use limiter fn x | x :: xs -> Limiter.fork limiter fn x; aux xs in aux items end type 'a key = 'a Hmap.key let create_key () = Hmap.Key.create () let get key = Hmap.find key (Cancel.Fiber_context.get_vars ()) let with_binding var value fn = let ctx = Effect.perform Cancel.Get_context in Cancel.Fiber_context.with_vars ctx (Hmap.add var value ctx.vars) fn let without_binding var fn = let ctx = Effect.perform Cancel.Get_context in Cancel.Fiber_context.with_vars ctx (Hmap.rem var ctx.vars) fn (* Coroutines. [fork_coroutine ~sw fn] creates a new fiber for [fn]. [fn] immediately suspends, setting its state to [Ready enqueue]. A consumer can resume it by setting the state to [Running] and calling [enqueue], while suspending itself. The consumer passes in its own [enqueue] function. They run alternatively like this, switching between the [Ready] and [Running] states. To finish, the coroutine fiber can set the state to [Finished] or [Failed], or the client can set the state to [Client_cancelled]. *) (* Note: we could easily generalise this to [('in, 'out) coroutine] if that was useful. *) type 'out coroutine = [ `Init | `Ready of [`Running of 'out Suspend.enqueue] Suspend.enqueue | `Running of 'out Suspend.enqueue | `Finished | `Client_cancelled of exn | `Failed of exn ] (* The only good reason for the state to change while the coroutine is running is if the client cancels. Return the exception in that case. If the coroutine is buggy it might e.g. fork two fibers and yield twice for a single request - return Invalid_argument in that case. *) let unwrap_cancelled state = match Atomic.get state with | `Client_cancelled ex -> ex | `Finished | `Failed _ -> Invalid_argument "Coroutine has already stopped!" | `Ready _ -> Invalid_argument "Coroutine has already yielded!" | `Init | `Running _ -> Invalid_argument "Coroutine in unexpected state!" let run_coroutine ~state fn = let await_request ~prev ~on_suspend = (* Suspend and wait for the consumer to resume us: *) Suspend.enter "await-consumer" (fun ctx enqueue -> let ready = `Ready enqueue in if Atomic.compare_and_set state prev ready then ( Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> if Atomic.compare_and_set state ready (`Failed ex) then enqueue (Error ex); (* else the client enqueued a resume for us; handle that instead *) ); on_suspend () ) else ( enqueue (Error (unwrap_cancelled state)) ) ) in let current_state = ref (await_request ~prev:`Init ~on_suspend:ignore) in fn (fun v -> (* The coroutine wants to yield the value [v] and suspend. *) let `Running enqueue as prev = !current_state in current_state := await_request ~prev ~on_suspend:(fun () -> enqueue (Ok (Some v))) ); (* [fn] has finished. End the stream. *) if Atomic.compare_and_set state (!current_state :> _ coroutine) `Finished then ( let `Running enqueue = !current_state in enqueue (Ok None) ) else ( raise (unwrap_cancelled state) ) let fork_coroutine ~sw fn = let state = Atomic.make `Init in fork_daemon ~sw (fun () -> try run_coroutine ~state fn; `Stop_daemon with ex -> match ex, Atomic.exchange state (`Failed ex) with | _, `Running enqueue -> (* A client is waiting for us. Send the error there. Also do this if we were cancelled. *) enqueue (Error ex); `Stop_daemon | Cancel.Cancelled _, _ -> (* The client isn't waiting (probably it got cancelled, then we tried to yield to it and got cancelled too). If it tries to resume us later it will see the error. *) `Stop_daemon | _ -> (* Something unexpected happened. Re-raise. *) raise ex ); fun () -> Suspend.enter "await-producer" (fun ctx enqueue -> let rec aux () = match Atomic.get state with | `Ready resume as prev -> let running = `Running enqueue in if Atomic.compare_and_set state prev running then ( resume (Ok running); Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> if Atomic.compare_and_set state running (`Client_cancelled ex) then enqueue (Error ex) ) ) else aux () | `Finished -> enqueue (Error (Invalid_argument "Coroutine has already finished!")) | `Failed ex | `Client_cancelled ex -> enqueue (Error (Invalid_argument ("Coroutine has already failed: " ^ Printexc.to_string ex))) | `Running _ -> enqueue (Error (Invalid_argument "Coroutine is still running!")) | `Init -> assert false in aux () ) let fork_seq ~sw fn = Seq.of_dispenser (fork_coroutine ~sw fn)
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>