package miou

  1. Overview
  2. Docs
Composable concurrency primitives for OCaml

Install

Dune Dependency

Authors

Maintainers

Sources

miou-0.3.1.tbz
sha256=2b7a2d52ec0599156b6e7c586190cc99dd964d840799763f6f2407bb83e39471
sha512=eba70cb4c5484ef4c5fce522b106d32f20482fe55a9252c82cf7b85d69cd1359d97a9d7279f39c05f3b2365d87cdfec39fbe2a0780167506d1eaeaf618227895

doc/src/miou.sync/miou_sync.ml.html

Source file miou_sync.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
(* Copyright (c) 2023 Vesa Karvonen

   Permission to use, copy, modify, and/or distribute this software for any purpose
   with or without fee is hereby granted, provided that the above copyright notice
   and this permission notice appear in all copies.

   THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
   REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
   FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
   INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
   OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
   TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF
   THIS SOFTWARE.
*)

module Backoff = Miou_backoff

module Trigger : sig
  type state =
    | Signaled
    | Awaiting : (t -> 'x -> 'y -> unit) * 'x * 'y -> state
    | Initial

  and t = state Atomic.t

  val create : unit -> t
  val is_initial : t -> bool
  val is_signaled : t -> bool
  val signal : t -> unit
  val on_signal : t -> 'x -> 'y -> (t -> 'x -> 'y -> unit) -> bool

  type _ Effect.t +=
    private
    | Await : t -> (exn * Printexc.raw_backtrace) option Effect.t

  val await : t -> (exn * Printexc.raw_backtrace) option
end = struct
  type state =
    | Signaled
    | Awaiting : (t -> 'x -> 'y -> unit) * 'x * 'y -> state
    | Initial

  and t = state Atomic.t

  let create () = Atomic.make Initial

  let rec signal t =
    match Atomic.get t with
    | Signaled -> ()
    | Initial ->
        if not (Atomic.compare_and_set t Initial Signaled) then signal t
    | Awaiting (fn, x, y) as seen ->
        if Atomic.compare_and_set t seen Signaled then fn t x y else signal t

  let is_signaled t = Atomic.get t = Signaled
  let is_initial t = Atomic.get t = Initial
  let[@inline never] awaiting () = invalid_arg "Trigger: already awaiting"

  let rec on_signal t x y fn =
    match Atomic.get t with
    | Initial ->
        Atomic.compare_and_set t Initial (Awaiting (fn, x, y))
        || on_signal t x y fn
    | Signaled -> false
    | Awaiting _ -> awaiting ()

  type _ Effect.t += Await : t -> (exn * Printexc.raw_backtrace) option Effect.t

  let await t =
    match Atomic.get t with
    | Signaled -> None
    | Awaiting _ -> awaiting ()
    | Initial -> Effect.perform (Await t)
end

module Computation : sig
  type 'a state =
    | Cancelled of exn * Printexc.raw_backtrace
    | Returned of 'a
    | Continue of { balance: int; triggers: Trigger.t list }

  type !'a t = 'a state Atomic.t

  val create : unit -> 'a t
  val try_return : 'a t -> 'a -> bool
  val try_capture : 'r t -> ('a -> 'r) -> 'a -> bool
  val try_cancel : 'a t -> exn * Printexc.raw_backtrace -> bool
  val is_running : 'a t -> bool
  val cancelled : 'a t -> (exn * Printexc.raw_backtrace) option
  val raise_if_errored : 'a t -> unit
  val peek : 'a t -> ('a, exn * Printexc.raw_backtrace) result option
  val try_attach : 'a t -> Trigger.t -> bool
  val detach : 'a t -> Trigger.t -> unit
  val clean : 'a t -> unit
  val await : 'a t -> ('a, exn * Printexc.raw_backtrace) result
  val await_exn : 'a t -> 'a
  val canceller : from:'a t -> into:'b t -> Trigger.t
end = struct
  type 'a state =
    | Cancelled of exn * Printexc.raw_backtrace
    | Returned of 'a
    | Continue of { balance: int; triggers: Trigger.t list }

  type 'a t = 'a state Atomic.t

  let create () = Atomic.make (Continue { balance= 0; triggers= [] })

  let cancelled t =
    match Atomic.get t with
    | Cancelled (exn, bt) -> Some (exn, bt)
    | Returned _ | Continue _ -> None

  open struct
    let rec gc length triggers = function
      | [] -> Continue { balance= length; triggers }
      | r :: rs ->
          if Trigger.is_signaled r then gc length triggers rs
          else gc (succ length) (r :: triggers) rs
  end

  let rec try_attach backoff t trigger =
    match Atomic.get t with
    | Returned _ | Cancelled _ -> false
    | Continue r as seen ->
        (not (Trigger.is_signaled trigger))
        &&
        let after =
          if 0 <= r.balance then
            Continue { balance= r.balance + 1; triggers= trigger :: r.triggers }
          else gc 1 [ trigger ] r.triggers
        in
        Atomic.compare_and_set t seen after
        || try_attach (Backoff.once backoff) t trigger

  let try_attach t trigger = try_attach Backoff.default t trigger

  let rec detach backoff t =
    match Atomic.get t with
    | Returned _ | Cancelled _ -> ()
    | Continue r as seen ->
        let after =
          if 0 <= r.balance then Continue { r with balance= r.balance - 2 }
          else gc 0 [] r.triggers
        in
        if not (Atomic.compare_and_set t seen after) then
          detach (Backoff.once backoff) t

  let detach t trigger = Trigger.signal trigger; detach Backoff.default t

  let rec clean backoff t =
    match Atomic.get t with
    | Returned _ | Cancelled _ -> ()
    | Continue r as seen ->
        let after = gc 0 [] r.triggers in
        if not (Atomic.compare_and_set t seen after) then
          clean (Backoff.once backoff) t

  let clean t = clean Backoff.default t

  let is_running t =
    match Atomic.get t with
    | Cancelled _ | Returned _ -> false
    | Continue _ -> true

  open struct
    let rec try_terminate backoff t after =
      match Atomic.get t with
      | Returned _ | Cancelled _ -> false
      | Continue r as seen ->
          if Atomic.compare_and_set t seen after then
            let () = List.iter Trigger.signal r.triggers in
            true
          else try_terminate (Backoff.once backoff) t after
  end

  let try_return t value = try_terminate Backoff.default t (Returned value)

  let try_cancel t (exn, bt) =
    try_terminate Backoff.default t (Cancelled (exn, bt))

  let try_capture t fn x =
    match fn x with
    | y -> try_return t y
    | exception exn ->
        let bt = Printexc.get_raw_backtrace () in
        try_cancel t (exn, bt)

  let raise_if_errored t =
    match Atomic.get t with
    | Cancelled (exn, bt) -> Printexc.raise_with_backtrace exn bt
    | Returned _ | Continue _ -> ()

  let peek t =
    match Atomic.get t with
    | Cancelled (exn, bt) -> Some (Error (exn, bt))
    | Returned v -> Some (Ok v)
    | Continue _ -> None

  open struct
    let propagate _ from into =
      match cancelled from with
      | None -> ()
      | Some v -> ignore (try_cancel into v)
  end

  let canceller ~from ~into =
    Atomic.make (Trigger.Awaiting (propagate, from, into))

  let rec await t =
    match Atomic.get t with
    | Returned value -> Ok value
    | Cancelled (exn, bt) -> Error (exn, bt)
    | Continue _ ->
        let trigger = Trigger.create () in
        if try_attach t trigger then (
          match Trigger.await trigger with
          | None -> await t
          | Some (exn, bt) ->
              detach t trigger;
              Error (exn, bt))
        else await t

  let await_exn t =
    match await t with
    | Ok value -> value
    | Error (exn, bt) -> Printexc.raise_with_backtrace exn bt
end
OCaml

Innovation. Community. Security.