package eio
Effect-based direct-style IO API for OCaml
Install
Dune Dependency
Authors
Maintainers
Sources
eio-0.12.tbz
sha256=d84847ce85ffb78641496ad24be3c6ab5cc2c6885cedad6ae257ecac59d926a0
sha512=fbcbc8e7e8eaaeacd6c7b3be04fec19b356f900307b2cc1bf6c1cd6bd538c4ea59ab2c7d936fac00c52a3277737671759f1584025c24e0a7727447609c633821
doc/src/eio.core/broadcast.ml.html
Source file broadcast.ml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
(* See the Cells module for an overview of this system. Each new waiter atomically increments the "suspend" pointer and writes a callback there. The waking fiber removes all the callbacks and calls them. In this version, "resume" never gets ahead of "suspend" (broadcasting just brings it up-to-date with the "suspend" pointer). When the resume fiber runs, some of the cells reserved for callbacks might not yet have been filled. In this case, the resuming fiber just marks them as needing to be resumed. When the suspending fiber continues, it will notice this and continue immediately. *) module Cell = struct (* For any given cell, there are two actors running in parallel: the suspender and the resumer. The resumer only performs a single operation (resume). The consumer waits to be resumed and then, optionally, cancels. This means we only have three cases to think about: 1. Consumer adds request (Empty -> Request). 1a. Provider fulfills it (Request -> Resumed). 1b. Consumer cancels it (Request -> Cancelled). 2. Provider gets to cell first (Empty -> Resumed). When the consumer tries to wait, it resumes immediately. The Resumed state should never been seen. It exists only to allow the request to be GC'd promptly. We could replace it with Empty, but having separate states is clearer for debugging. *) type _ t = | Request of (unit -> unit) | Cancelled | Resumed | Empty let init = Empty let segment_order = 2 let dump f = function | Request _ -> Fmt.string f "Request" | Empty -> Fmt.string f "Empty" | Resumed -> Fmt.string f "Resumed" | Cancelled -> Fmt.string f "Cancelled" end module Cells = Cells.Make(Cell) type cell = unit Cell.t type t = unit Cells.t type request = unit Cells.segment * cell Atomic.t let rec resume cell = match (Atomic.get cell : cell) with | Request r as cur -> (* The common case: we have a waiter for the value *) if Atomic.compare_and_set cell cur Resumed then r (); (* else it was cancelled at the same time; ignore *) | Empty -> (* The consumer has reserved this cell but not yet stored the request. We place Resumed there and it will handle it soon. *) if Atomic.compare_and_set cell Empty Resumed then () (* The consumer will deal with it *) else resume cell (* The Request was added concurrently; use it *) | Cancelled -> () | Resumed -> (* This state is unreachable because we (the provider) haven't set this yet *) assert false let cancel (segment, cell) = match (Atomic.get cell : cell) with | Request _ as old -> if Atomic.compare_and_set cell old Cancelled then ( Cells.cancel_cell segment; true ) else false (* We got resumed first *) | Resumed -> false (* We got resumed first *) | Cancelled -> invalid_arg "Already cancelled!" | Empty -> (* To call [cancel] the user needs a [request] value, which they only get once we've reached the [Request] state. [Empty] is unreachable from [Request]. *) assert false let suspend t k = let (_, cell) as request = Cells.next_suspend t in if Atomic.compare_and_set cell Empty (Request k) then Some request else match Atomic.get cell with | Resumed -> (* Resumed before we could add the waiter *) k (); None | Cancelled | Request _ | Empty -> (* These are unreachable from the previously-observed non-Empty state without us taking some action first *) assert false let resume_all t = Cells.resume_all t resume let create = Cells.make let dump f t = Cells.dump f t
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>