package eio
Effect-based direct-style IO API for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
eio-1.2.tbz
sha256=3792e912bd8d494bb2e38f73081825e4d212b1970cf2c1f1b2966caa9fd6bc40
sha512=4a80dbcf8cf2663bdad0f2970871844f37bd293c56bd1ce602910e0a613754945f1f942719f9630906453be7c73c1732dc97526c6c90b0b36100d04fd5e871e4
doc/src/eio/condition.ml.html
Source file condition.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
(* Import these directly because we copy this file for the dscheck tests. *) module Fiber_context = Eio__core.Private.Fiber_context module Suspend = Eio__core.Private.Suspend module Cancel = Eio__core.Cancel type t = Broadcast.t let create () = Broadcast.create () let lock_protected m = Cancel.protect (fun () -> Eio_mutex.lock m) let await_generic ?mutex t = match Suspend.enter_unchecked "Condition.await" (fun ctx enqueue -> match Fiber_context.get_error ctx with | Some ex -> Option.iter Eio_mutex.unlock mutex; enqueue (Error ex) | None -> match Broadcast.suspend t (fun () -> enqueue (Ok ())) with | None -> Option.iter Eio_mutex.unlock mutex | Some request -> Option.iter Eio_mutex.unlock mutex; Fiber_context.set_cancel_fn ctx (fun ex -> if Broadcast.cancel request then enqueue (Error ex) (* else already succeeded *) ) ) with | () -> Option.iter lock_protected mutex | exception ex -> let bt = Printexc.get_raw_backtrace () in Option.iter lock_protected mutex; Printexc.raise_with_backtrace ex bt let await t mutex = await_generic ~mutex t let await_no_mutex t = await_generic t let broadcast = Broadcast.resume_all type request = Broadcast.request option let register_immediate = Broadcast.suspend let cancel = function | Some request -> Broadcast.cancel request | None -> false let ensure_cancelled x = ignore (cancel x : bool) type state = | Init | Waiting of ((unit, exn) result -> unit) | Done (* There main property want is that we don't suspend forever if a broadcast happened after [fn] started, or if the fiber is cancelled. 1. We start in the Init state. 2. If a broadcast happens here we move to Done. If we later try to suspend, we'll resume immediately. 3. We run [fn]. If a broadcast happens during this we'll transition to Done as before. 4. If [fn] raises or wants to stop normally, we return without suspending at all. 5. Otherwise, we suspend the fiber. 6. We try to transition from Init to Waiting. If a broadcast transitioned to Done before this, we resume immediately. If a broadcast transitions afterwards, [wake] will see the [enqueue] function and wake us. Therefore, we can only sleep forever if a broadcast never happens after starting [fn]. 7. If the fiber is cancelled before suspending, we raise on suspend. If cancelled after suspending and before the request succeeds, we cancel the request and raise. If cancelled after the request succeeds, [wake] will resume us. *) let rec loop_no_mutex t fn = let state = Atomic.make Init in let wake () = match Atomic.exchange state Done with | Init -> () (* Broadcast happened before we suspended; suspend will notice *) | Waiting enqueue -> enqueue (Ok ()) | Done -> assert false in let request = Broadcast.suspend t wake in (* Note: to avoid memory leaks, make sure that [request] is finished in all cases. *) match fn () with | exception ex -> let bt = Printexc.get_raw_backtrace () in ensure_cancelled request; Printexc.raise_with_backtrace ex bt | Some x -> ensure_cancelled request; x | None -> Suspend.enter_unchecked "Condition.loop_no_mutex" (fun ctx enqueue -> match Fiber_context.get_error ctx with | Some ex -> ensure_cancelled request; (* If a broadcast already happened, we still cancel. *) enqueue (Error ex) | None -> let waiting = Waiting enqueue in if Atomic.compare_and_set state Init waiting then ( (* We were in Init, so [wake] hasn't yet done anything. When it runs, it will resume us. We're also not currently cancelled, because we checked above and cancellations only come from the same thread. *) Fiber_context.set_cancel_fn ctx (fun ex -> if cancel request then ( (* We could set the state to Done here, but there's no need; we're not racing with anything now. [wake] never runs. *) enqueue (Error ex) ) (* else we already got resumed *) ) ) else ( (* State is already Done, but [wake] couldn't wake us then because we hadn't moved to [waiting]. Resume now. *) enqueue (Ok ()) ) ); loop_no_mutex t fn
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>