package picos_std

  1. Overview
  2. Docs

Source file bundle.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
open Picos

let[@inline never] completed () = invalid_arg "already completed"

type _ tdt =
  | Nothing : [> `Nothing ] tdt
  | Bundle : {
      config : int Atomic.t;
      bundle : Computation.packed;
      errors : Control.Errors.t;
      finished : Trigger.t;
    }
      -> [> `Bundle ] tdt

let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40 (* memory runs out before overflow *)

let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()

type t = [ `Bundle ] tdt

let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
  Computation.cancel bundle Control.Terminate
    (Control.get_callstack_opt callstack)

let terminate_after ?callstack (Bundle { bundle = Packed bundle; _ } : t)
    ~seconds =
  Computation.cancel_after bundle ~seconds Control.Terminate
    (Control.get_callstack_opt callstack)

let error ?callstack (Bundle r as t : t) exn bt =
  if exn != Control.Terminate then begin
    terminate ?callstack t;
    Control.Errors.push r.errors exn bt
  end

let decr (Bundle r : t) =
  let n = Atomic.fetch_and_add r.config (-config_one) in
  if n < config_one * 2 then begin
    let (Packed bundle) = r.bundle in
    Computation.cancel bundle Control.Terminate Control.empty_bt;
    Trigger.signal r.finished
  end

type _ pass = FLS : unit pass | Arg : t pass

let[@inline never] no_flock () = invalid_arg "no flock"

let get_flock fiber =
  match Fiber.FLS.get fiber flock_key ~default:Nothing with
  | Bundle _ as t -> t
  | Nothing -> no_flock ()

let await (type a) (Bundle r as t : t) fiber packed canceler outer
    (pass : a pass) =
  decr t;
  Fiber.set_computation fiber packed;
  let forbid = Fiber.exchange fiber ~forbid:true in
  Trigger.await r.finished |> ignore;
  Fiber.set fiber ~forbid;
  begin
    match pass with FLS -> Fiber.FLS.set fiber flock_key outer | Arg -> ()
  end;
  let (Packed parent) = packed in
  Computation.detach parent canceler;
  Control.Errors.check r.errors;
  Fiber.check fiber

let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
    =
  (* The sequence of operations below ensures that nothing is leaked. *)
  let (Bundle r as t : t) =
    let terminated =
      match on_return with
      | None | Some `Wait -> 0
      | Some `Terminate -> config_terminated_bit
    in
    let callstack =
      match callstack with
      | None -> 0
      | Some n ->
          if n <= 0 then 0
          else
            Int.min n (config_callstack_mask lsr config_callstack_shift)
            lsl config_callstack_shift
    in
    let config = Atomic.make (config_one lor callstack lor terminated) in
    let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
    let errors = Control.Errors.create () in
    let finished = Trigger.create () in
    Bundle { config; bundle; errors; finished }
  in
  let fiber = Fiber.current () in
  let outer =
    match pass with
    | Arg -> Nothing
    | FLS -> Fiber.FLS.get fiber flock_key ~default:Nothing
  in
  let (Packed parent as packed) = Fiber.get_computation fiber in
  let (Packed bundle) = r.bundle in
  let canceler = Computation.attach_canceler ~from:parent ~into:bundle in
  (* Ideally there should be no poll point betweem [attach_canceler] and the
     [match ... with] below. *)
  match
    Fiber.set_computation fiber r.bundle;
    fn (match pass with FLS -> Fiber.FLS.set fiber flock_key t | Arg -> t)
  with
  | value ->
      let config = Atomic.get r.config in
      if config land config_terminated_bit <> 0 then begin
        let callstack =
          let n =
            (config land config_callstack_mask) lsr config_callstack_shift
          in
          if n = 0 then None else Some n
        in
        terminate ?callstack t
      end;
      await t fiber packed canceler outer pass;
      value
  | exception exn ->
      let bt = Printexc.get_raw_backtrace () in
      error t exn bt;
      await t fiber packed canceler outer pass;
      Printexc.raise_with_backtrace exn bt

let rec incr (Bundle r as t : t) backoff =
  let before = Atomic.get r.config in
  if before < config_one then completed ()
  else if not (Atomic.compare_and_set r.config before (before + config_one))
  then incr t (Backoff.once backoff)

let fork_as_promise_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
  (* The sequence of operations below ensures that nothing is leaked. *)
  incr t Backoff.default;
  try
    let child = Computation.create ~mode:`LIFO () in
    let fiber = Fiber.create ~forbid:false child in
    let (Packed bundle) = r.bundle in
    let canceler = Computation.attach_canceler ~from:bundle ~into:child in
    let main =
      match pass with
      | FLS ->
          Fiber.FLS.set fiber flock_key t;
          fun fiber ->
            begin
              match thunk () with
              | value -> Computation.return child value
              | exception exn ->
                  let bt = Printexc.get_raw_backtrace () in
                  Computation.cancel child exn bt;
                  error (get_flock fiber) exn bt
            end;
            let (Bundle r as t : t) = get_flock fiber in
            let (Packed bundle) = r.bundle in
            Computation.detach bundle canceler;
            decr t
      | Arg ->
          fun _ ->
            begin
              match thunk () with
              | value -> Computation.return child value
              | exception exn ->
                  let bt = Printexc.get_raw_backtrace () in
                  Computation.cancel child exn bt;
                  error t exn bt
            end;
            let (Packed bundle) = r.bundle in
            Computation.detach bundle canceler;
            decr t
    in
    Fiber.spawn fiber main;
    child
  with canceled_exn ->
    (* We don't worry about detaching the [canceler], because at this point we
       know the bundle computation has completed or there is something more
       serious. *)
    decr t;
    raise canceled_exn

let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
  (* The sequence of operations below ensures that nothing is leaked. *)
  incr t Backoff.default;
  try
    let fiber = Fiber.create_packed ~forbid:false r.bundle in
    let main =
      match pass with
      | FLS ->
          Fiber.FLS.set fiber flock_key t;
          fun fiber ->
            begin
              try thunk ()
              with exn ->
                error (get_flock fiber) exn (Printexc.get_raw_backtrace ())
            end;
            decr (get_flock fiber)
      | Arg ->
          fun _ ->
            begin
              try thunk ()
              with exn -> error t exn (Printexc.get_raw_backtrace ())
            end;
            decr t
    in
    Fiber.spawn fiber main
  with canceled_exn ->
    decr t;
    raise canceled_exn

(* *)

let is_running (Bundle { bundle = Packed bundle; _ } : t) =
  Computation.is_running bundle

let join_after ?callstack ?on_return fn =
  join_after_pass ?callstack ?on_return fn Arg

let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg

let unsafe_incr (Bundle r : t) =
  Atomic.fetch_and_add r.config config_one |> ignore
OCaml

Innovation. Community. Security.