package eio

  1. Overview
  2. Docs

Source file promise.ml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
(* Note on thread-safety

   Promises can be shared between domains, so everything here must be thread-safe.

   Wrapping everything in a mutex would be one way to do that, but that makes reads
   slow, and only one domain would be able to read at a time.

   Instead, we use an Atomic to hold the state, plus an additional mutex for the waiters
   while in the Unresolved state. This makes resolved promises faster (at the cost of
   making operations on unresolved promises a bit slower). It also makes reasoning about
   the code more fun.

   We can think of atomics and mutexes as "boxes", containing values and
   invariants. To use them, you open the box to get access to the contents,
   then close the box afterwards, restoring the invariant. For mutexes,
   open/close is lock/unlock. For atomics, every operation implicitly opens and
   closes the box. Any number of callers can share a reference to the box
   itself; the runtime ensures a box can only be opened by one user at a time.

   We can hold a full reference to something (meaning no-one else has access to it
   and we can mutate it), or a fraction (giving us read-only access but also
   ensuring that no-one else can mutate it either). *)

type 'a state =
  | Resolved of 'a
  | Unresolved of 'a Waiters.t * Mutex.t
  (* The Unresolved state's mutex box contains:
     - Full access to the Waiters.
     - Half access to the promise's state.
     - The invariant that if the promise is resolved then the waiters list is empty. *)

type !'a promise = {
  id : Ctf.id;

  state : 'a state Atomic.t;
  (* This atomic box contains either:
     - A non-zero share of the reference to the Resolved state.
     - A half-share of the reference to the Unresolved state. *)
}

type +!'a t
type -!'a u

type 'a or_exn = ('a, exn) result t

let to_public_promise : 'a promise -> 'a t = Obj.magic
let to_public_resolver : 'a promise -> 'a u = Obj.magic
let of_public_promise : 'a t -> 'a promise = Obj.magic
let of_public_resolver : 'a u -> 'a promise = Obj.magic

let create_with_id id =
  let t = {
    id;
    state = Atomic.make (Unresolved (Waiters.create (), Mutex.create ()));
  } in
  to_public_promise t, to_public_resolver t

let create ?label () =
  let id = Ctf.mint_id () in
  Ctf.note_created ?label id Ctf.Promise;
  create_with_id id

let create_resolved x =
  let id = Ctf.mint_id () in
  Ctf.note_created id Ctf.Promise;
  to_public_promise { id; state = Atomic.make (Resolved x) }

let await t =
  let t = of_public_promise t in
  match Atomic.get t.state with
  (* If the atomic is resolved, we take a share of that reference and return
     the remainder to the atomic (which will still be non-zero). We can then
     continue to know that the promise is resolved after the [Atomic.get]. *)
  | Resolved x ->
    Ctf.note_read t.id;
    x
  | Unresolved (q, mutex) ->
    (* We discovered that the promise was unresolved, but we can't be sure it still is,
       since we had to return the half-share reference to the atomic. So the [get] is
       just to get access to the mutex. *)
    Ctf.note_try_read t.id;
    Mutex.lock mutex;
    (* Having opened the mutex, we have:
       - Access to the waiters.
       - Half access to the promise's state (so we know it can't change until we close the mutex).
       - The mutex invariant. *)
    match Atomic.get t.state with
    | Unresolved _ ->
      (* The promise is unresolved, and can't change while we hold the mutex.
         It's therefore safe to add a new waiter (and let [Waiters.await] close the mutex). *)
      Waiters.await ~mutex:(Some mutex) q t.id
    (* Otherwise, the promise was resolved by the time we took the lock.
       Release the lock (which is fine, as we didn't change anything). *)
    | Resolved x ->
      Mutex.unlock mutex;
      Ctf.note_read t.id;
      x

let await_exn t =
  match await t with
  | Ok x -> x
  | Error ex -> raise ex

let resolve t v =
  let rec resolve' t v =
    match Atomic.get t.state with
    | Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
    | Unresolved (q, mutex) as prev ->
      (* The above [get] just gets us access to the mutex;
         By the time we get here, the promise may have become resolved. *)
      Mutex.lock mutex;
      (* Having opened the mutex, we have:
         - Access to the waiters.
         - Half access to the promise's state (so we know it can't change until we close the mutex).
         - The mutex invariant.
         Now we open the atomic again, getting the other half access. Together,
         this gives us full access to the state (i.e. no-one else can be using
         it), allowing us to change it.
         Note: we don't actually need an atomic CAS here, just a get and a set
         would do, but this seems simplest. *)
      if Atomic.compare_and_set t.state prev (Resolved v) then (
        (* The atomic now has half-access to the fullfilled state (which counts
           as non-zero), and we have the other half. Now we need to restore the
           mutex invariant by clearing the wakers. *)
        Ctf.note_resolved t.id ~ex:None;
        Waiters.wake_all q v;
        Mutex.unlock mutex
      ) else (
        (* Otherwise, the promise was already resolved when we opened the mutex.
           Close it without any changes and retry. *)
        Mutex.unlock mutex;
        resolve' t v
      )
  in
  resolve' (of_public_resolver t) v

let resolve_ok    u x = resolve u (Ok x)
let resolve_error u x = resolve u (Error x)

let peek t =
  let t = of_public_promise t in
  match Atomic.get t.state with
  | Unresolved _ -> None
  | Resolved x -> Some x

let id t =
  let t = of_public_promise t in
  t.id

let is_resolved t =
  Option.is_some (peek t)
OCaml

Innovation. Community. Security.